diff options
-rw-r--r-- | src/cadet/cadet_api.c | 1044 | ||||
-rw-r--r-- | src/include/gnunet_cadet_service.h | 4 |
2 files changed, 425 insertions, 623 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index 1ca8bad9d..a9793a7f8 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c | |||
@@ -25,12 +25,12 @@ | |||
25 | 25 | ||
26 | #include "platform.h" | 26 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 27 | #include "gnunet_util_lib.h" |
28 | #include "gnunet_constants.h" | ||
28 | #include "gnunet_cadet_service.h" | 29 | #include "gnunet_cadet_service.h" |
29 | #include "cadet.h" | 30 | #include "cadet.h" |
30 | #include "cadet_protocol.h" | 31 | #include "cadet_protocol.h" |
31 | 32 | ||
32 | #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__) | 33 | #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__) |
33 | #define DATA_OVERHEAD sizeof(struct GNUNET_CADET_LocalData) | ||
34 | 34 | ||
35 | /******************************************************************************/ | 35 | /******************************************************************************/ |
36 | /************************ DATA STRUCTURES ****************************/ | 36 | /************************ DATA STRUCTURES ****************************/ |
@@ -41,7 +41,6 @@ | |||
41 | */ | 41 | */ |
42 | struct GNUNET_CADET_TransmitHandle | 42 | struct GNUNET_CADET_TransmitHandle |
43 | { | 43 | { |
44 | |||
45 | /** | 44 | /** |
46 | * Double Linked list | 45 | * Double Linked list |
47 | */ | 46 | */ |
@@ -58,6 +57,11 @@ struct GNUNET_CADET_TransmitHandle | |||
58 | struct GNUNET_CADET_Channel *channel; | 57 | struct GNUNET_CADET_Channel *channel; |
59 | 58 | ||
60 | /** | 59 | /** |
60 | * Request data task. | ||
61 | */ | ||
62 | struct GNUNET_SCHEDULER_Task *request_data_task; | ||
63 | |||
64 | /** | ||
61 | * Callback to obtain the message to transmit, or NULL if we | 65 | * Callback to obtain the message to transmit, or NULL if we |
62 | * got the message in 'data'. Notice that messages built | 66 | * got the message in 'data'. Notice that messages built |
63 | * by 'notify' need to be encapsulated with information about | 67 | * by 'notify' need to be encapsulated with information about |
@@ -71,20 +75,7 @@ struct GNUNET_CADET_TransmitHandle | |||
71 | void *notify_cls; | 75 | void *notify_cls; |
72 | 76 | ||
73 | /** | 77 | /** |
74 | * How long is this message valid. Once the timeout has been | 78 | * Size of the payload. |
75 | * reached, the message must no longer be sent. If this | ||
76 | * is a message with a 'notify' callback set, the 'notify' | ||
77 | * function should be called with 'buf' NULL and size 0. | ||
78 | */ | ||
79 | struct GNUNET_TIME_Absolute timeout; | ||
80 | |||
81 | /** | ||
82 | * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER. | ||
83 | */ | ||
84 | struct GNUNET_SCHEDULER_Task * timeout_task; | ||
85 | |||
86 | /** | ||
87 | * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL. | ||
88 | */ | 79 | */ |
89 | size_t size; | 80 | size_t size; |
90 | }; | 81 | }; |
@@ -123,11 +114,10 @@ union CadetInfoCB { | |||
123 | */ | 114 | */ |
124 | struct GNUNET_CADET_Handle | 115 | struct GNUNET_CADET_Handle |
125 | { | 116 | { |
126 | |||
127 | /** | 117 | /** |
128 | * Handle to the server connection, to send messages later | 118 | * Message queue (if available). |
129 | */ | 119 | */ |
130 | struct GNUNET_CLIENT_Connection *client; | 120 | struct GNUNET_MQ_Handle *mq; |
131 | 121 | ||
132 | /** | 122 | /** |
133 | * Set of handlers used for processing incoming messages in the channels | 123 | * Set of handlers used for processing incoming messages in the channels |
@@ -160,11 +150,6 @@ struct GNUNET_CADET_Handle | |||
160 | GNUNET_CADET_ChannelEndHandler *cleaner; | 150 | GNUNET_CADET_ChannelEndHandler *cleaner; |
161 | 151 | ||
162 | /** | 152 | /** |
163 | * Handle to cancel pending transmissions in case of disconnection | ||
164 | */ | ||
165 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
166 | |||
167 | /** | ||
168 | * Closure for all the handlers given by the client | 153 | * Closure for all the handlers given by the client |
169 | */ | 154 | */ |
170 | void *cls; | 155 | void *cls; |
@@ -184,12 +169,6 @@ struct GNUNET_CADET_Handle | |||
184 | */ | 169 | */ |
185 | CADET_ChannelNumber next_chid; | 170 | CADET_ChannelNumber next_chid; |
186 | 171 | ||
187 | /** | ||
188 | * Have we started the task to receive messages from the service | ||
189 | * yet? We do this after we send the 'CADET_LOCAL_CONNECT' message. | ||
190 | */ | ||
191 | int in_receive; | ||
192 | |||
193 | /** | 172 | /** |
194 | * Configuration given by the client, in case of reconnection | 173 | * Configuration given by the client, in case of reconnection |
195 | */ | 174 | */ |
@@ -337,24 +316,6 @@ struct CadetMQState | |||
337 | 316 | ||
338 | 317 | ||
339 | /******************************************************************************/ | 318 | /******************************************************************************/ |
340 | /*********************** DECLARATIONS *************************/ | ||
341 | /******************************************************************************/ | ||
342 | |||
343 | /** | ||
344 | * Function called to send a message to the service. | ||
345 | * "buf" will be NULL and "size" zero if the socket was closed for writing in | ||
346 | * the meantime. | ||
347 | * | ||
348 | * @param cls closure, the cadet handle | ||
349 | * @param size number of bytes available in buf | ||
350 | * @param buf where the callee should write the connect message | ||
351 | * @return number of bytes written to buf | ||
352 | */ | ||
353 | static size_t | ||
354 | send_callback (void *cls, size_t size, void *buf); | ||
355 | |||
356 | |||
357 | /******************************************************************************/ | ||
358 | /*********************** AUXILIARY FUNCTIONS *************************/ | 319 | /*********************** AUXILIARY FUNCTIONS *************************/ |
359 | /******************************************************************************/ | 320 | /******************************************************************************/ |
360 | 321 | ||
@@ -392,29 +353,6 @@ find_port (const struct GNUNET_CADET_Handle *h, | |||
392 | return p; | 353 | return p; |
393 | } | 354 | } |
394 | 355 | ||
395 | /** | ||
396 | * Check whether there is any message ready in the queue and find the size. | ||
397 | * | ||
398 | * @param h Cadet handle. | ||
399 | * | ||
400 | * @return The size of the first ready message in the queue, including overhead. | ||
401 | * 0 if there is none. | ||
402 | */ | ||
403 | static size_t | ||
404 | message_ready_size (struct GNUNET_CADET_Handle *h) | ||
405 | { | ||
406 | struct GNUNET_CADET_TransmitHandle *th; | ||
407 | struct GNUNET_CADET_Channel *ch; | ||
408 | |||
409 | for (th = h->th_head; NULL != th; th = th->next) | ||
410 | { | ||
411 | ch = th->channel; | ||
412 | if (GNUNET_NO == th_is_payload (th) || GNUNET_YES == ch->allow_send) | ||
413 | return th->size; | ||
414 | } | ||
415 | return 0; | ||
416 | } | ||
417 | |||
418 | 356 | ||
419 | /** | 357 | /** |
420 | * Get the channel handler for the channel specified by id from the given handle | 358 | * Get the channel handler for the channel specified by id from the given handle |
@@ -520,22 +458,8 @@ destroy_channel (struct GNUNET_CADET_Channel *ch, int call_cleaner) | |||
520 | * Management traffic should be ok, as clients can't cancel that. | 458 | * Management traffic should be ok, as clients can't cancel that. |
521 | * If the service crashed and we are reconnecting, it's ok. | 459 | * If the service crashed and we are reconnecting, it's ok. |
522 | */ | 460 | */ |
523 | GNUNET_break (GNUNET_NO == th_is_payload (th) | 461 | GNUNET_break (GNUNET_NO == th_is_payload (th)); |
524 | || GNUNET_NO == h->in_receive); | 462 | GNUNET_CADET_notify_transmit_ready_cancel (th); |
525 | GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th); | ||
526 | |||
527 | /* clean up request */ | ||
528 | if (NULL != th->timeout_task) | ||
529 | GNUNET_SCHEDULER_cancel (th->timeout_task); | ||
530 | GNUNET_free (th); | ||
531 | } | ||
532 | |||
533 | /* if there are no more pending requests with cadet service, cancel active request */ | ||
534 | /* Note: this should be unnecessary... */ | ||
535 | if ((0 == message_ready_size (h)) && (NULL != h->th)) | ||
536 | { | ||
537 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
538 | h->th = NULL; | ||
539 | } | 463 | } |
540 | 464 | ||
541 | if (0 != ch->peer) | 465 | if (0 != ch->peer) |
@@ -546,32 +470,6 @@ destroy_channel (struct GNUNET_CADET_Channel *ch, int call_cleaner) | |||
546 | 470 | ||
547 | 471 | ||
548 | /** | 472 | /** |
549 | * Notify client that the transmission has timed out | ||
550 | * | ||
551 | * @param cls closure | ||
552 | */ | ||
553 | static void | ||
554 | timeout_transmission (void *cls) | ||
555 | { | ||
556 | struct GNUNET_CADET_TransmitHandle *th = cls; | ||
557 | struct GNUNET_CADET_Handle *cadet = th->channel->cadet; | ||
558 | |||
559 | th->timeout_task = NULL; | ||
560 | th->channel->packet_size = 0; | ||
561 | GNUNET_CONTAINER_DLL_remove (cadet->th_head, cadet->th_tail, th); | ||
562 | if (GNUNET_YES == th_is_payload (th)) | ||
563 | GNUNET_break (0 == th->notify (th->notify_cls, 0, NULL)); | ||
564 | GNUNET_free (th); | ||
565 | if ((0 == message_ready_size (cadet)) && (NULL != cadet->th)) | ||
566 | { | ||
567 | /* nothing ready to transmit, no point in asking for transmission */ | ||
568 | GNUNET_CLIENT_notify_transmit_ready_cancel (cadet->th); | ||
569 | cadet->th = NULL; | ||
570 | } | ||
571 | } | ||
572 | |||
573 | |||
574 | /** | ||
575 | * Add a transmit handle to the transmission queue and set the | 473 | * Add a transmit handle to the transmission queue and set the |
576 | * timeout if needed. | 474 | * timeout if needed. |
577 | * | 475 | * |
@@ -583,30 +481,10 @@ add_to_queue (struct GNUNET_CADET_Handle *h, | |||
583 | struct GNUNET_CADET_TransmitHandle *th) | 481 | struct GNUNET_CADET_TransmitHandle *th) |
584 | { | 482 | { |
585 | GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th); | 483 | GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th); |
586 | if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us == th->timeout.abs_value_us) | ||
587 | return; | ||
588 | th->timeout_task = | ||
589 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining | ||
590 | (th->timeout), &timeout_transmission, th); | ||
591 | } | 484 | } |
592 | 485 | ||
593 | 486 | ||
594 | /** | 487 | /** |
595 | * Auxiliary function to send an already constructed packet to the service. | ||
596 | * Takes care of creating a new queue element, copying the message and | ||
597 | * calling the tmt_rdy function if necessary. | ||
598 | * | ||
599 | * @param h cadet handle | ||
600 | * @param msg message to transmit | ||
601 | * @param channel channel this send is related to (NULL if N/A) | ||
602 | */ | ||
603 | static void | ||
604 | send_packet (struct GNUNET_CADET_Handle *h, | ||
605 | const struct GNUNET_MessageHeader *msg, | ||
606 | struct GNUNET_CADET_Channel *channel); | ||
607 | |||
608 | |||
609 | /** | ||
610 | * Send an ack on the channel to confirm the processing of a message. | 488 | * Send an ack on the channel to confirm the processing of a message. |
611 | * | 489 | * |
612 | * @param ch Channel on which to send the ACK. | 490 | * @param ch Channel on which to send the ACK. |
@@ -614,124 +492,54 @@ send_packet (struct GNUNET_CADET_Handle *h, | |||
614 | static void | 492 | static void |
615 | send_ack (struct GNUNET_CADET_Channel *ch) | 493 | send_ack (struct GNUNET_CADET_Channel *ch) |
616 | { | 494 | { |
617 | struct GNUNET_CADET_LocalAck msg; | 495 | struct GNUNET_CADET_LocalAck *msg; |
496 | struct GNUNET_MQ_Envelope *env; | ||
497 | |||
498 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); | ||
618 | 499 | ||
619 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on channel %X\n", ch->chid); | 500 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on channel %X\n", ch->chid); |
620 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); | 501 | msg->channel_id = htonl (ch->chid); |
621 | msg.header.size = htons (sizeof (msg)); | 502 | GNUNET_MQ_send (ch->cadet->mq, env); |
622 | msg.channel_id = htonl (ch->chid); | ||
623 | 503 | ||
624 | send_packet (ch->cadet, &msg.header, ch); | ||
625 | return; | 504 | return; |
626 | } | 505 | } |
627 | 506 | ||
628 | 507 | ||
629 | 508 | ||
630 | /** | 509 | /******************************************************************************/ |
631 | * Reconnect callback: tries to reconnect again after a failer previous | 510 | /*********************** RECEIVE HANDLERS ****************************/ |
632 | * reconnection. | 511 | /******************************************************************************/ |
633 | * | ||
634 | * @param cls closure (cadet handle) | ||
635 | */ | ||
636 | static void | ||
637 | reconnect_cbk (void *cls); | ||
638 | |||
639 | |||
640 | /** | ||
641 | * Reconnect to the service, retransmit all infomation to try to restore the | ||
642 | * original state. | ||
643 | * | ||
644 | * @param h handle to the cadet | ||
645 | * | ||
646 | * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...) | ||
647 | */ | ||
648 | static int | ||
649 | do_reconnect (struct GNUNET_CADET_Handle *h) | ||
650 | { | ||
651 | LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n"); | ||
652 | LOG (GNUNET_ERROR_TYPE_DEBUG, "******* RECONNECT *******\n"); | ||
653 | LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n"); | ||
654 | LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h); | ||
655 | LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n"); | ||
656 | |||
657 | /* disconnect */ | ||
658 | if (NULL != h->th) | ||
659 | { | ||
660 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
661 | h->th = NULL; | ||
662 | } | ||
663 | if (NULL != h->client) | ||
664 | { | ||
665 | GNUNET_CLIENT_disconnect (h->client); | ||
666 | } | ||
667 | |||
668 | /* connect again */ | ||
669 | h->client = GNUNET_CLIENT_connect ("cadet", h->cfg); | ||
670 | if (h->client == NULL) | ||
671 | { | ||
672 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, | ||
673 | &reconnect_cbk, h); | ||
674 | h->reconnect_time = | ||
675 | GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, | ||
676 | GNUNET_TIME_relative_multiply | ||
677 | (h->reconnect_time, 2)); | ||
678 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Next retry in %s\n", | ||
679 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_time, | ||
680 | GNUNET_NO)); | ||
681 | GNUNET_break (0); | ||
682 | return GNUNET_NO; | ||
683 | } | ||
684 | else | ||
685 | { | ||
686 | h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; | ||
687 | } | ||
688 | return GNUNET_YES; | ||
689 | } | ||
690 | |||
691 | /** | ||
692 | * Reconnect callback: tries to reconnect again after a failer previous | ||
693 | * reconnecttion | ||
694 | * | ||
695 | * @param cls closure (cadet handle) | ||
696 | */ | ||
697 | static void | ||
698 | reconnect_cbk (void *cls) | ||
699 | { | ||
700 | struct GNUNET_CADET_Handle *h = cls; | ||
701 | |||
702 | h->reconnect_task = NULL; | ||
703 | do_reconnect (h); | ||
704 | } | ||
705 | 512 | ||
706 | 513 | ||
707 | /** | 514 | /** |
708 | * Reconnect to the service, retransmit all infomation to try to restore the | 515 | * Call the @a notify callback given to #GNUNET_CADET_notify_transmit_ready to |
709 | * original state. | 516 | * request the data to send over MQ. Since MQ manages the queue, this function |
517 | * is scheduled immediatly after a transmit ready notification. | ||
710 | * | 518 | * |
711 | * @param h handle to the cadet | 519 | * @param cls Closure (transmit handle). |
712 | * | ||
713 | * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...) | ||
714 | */ | 520 | */ |
715 | static void | 521 | static void |
716 | reconnect (struct GNUNET_CADET_Handle *h) | 522 | request_data (void *cls) |
717 | { | 523 | { |
718 | struct GNUNET_CADET_Channel *ch; | 524 | struct GNUNET_CADET_TransmitHandle *th = cls; |
525 | struct GNUNET_CADET_LocalData *msg; | ||
526 | struct GNUNET_MQ_Envelope *env; | ||
527 | size_t osize; | ||
719 | 528 | ||
720 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 529 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting Data: %u bytes\n", th->size); |
721 | "Requested RECONNECT, destroying all channels\n"); | 530 | th->request_data_task = NULL; |
722 | h->in_receive = GNUNET_NO; | 531 | th->channel->packet_size = 0; |
723 | for (ch = h->channels_head; NULL != ch; ch = h->channels_head) | 532 | env = GNUNET_MQ_msg_extra (msg, th->size, |
724 | destroy_channel (ch, GNUNET_YES); | 533 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); |
725 | if (NULL == h->reconnect_task) | 534 | msg->id = htonl (th->channel->chid); |
726 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, | 535 | osize = th->notify (th->notify_cls, th->size, &msg[1]); |
727 | &reconnect_cbk, h); | 536 | GNUNET_assert (osize == th->size); |
537 | th->channel->allow_send = GNUNET_NO; | ||
538 | GNUNET_MQ_send (th->channel->cadet->mq, env); | ||
539 | GNUNET_CADET_notify_transmit_ready_cancel (th); | ||
728 | } | 540 | } |
729 | 541 | ||
730 | 542 | ||
731 | /******************************************************************************/ | ||
732 | /*********************** RECEIVE HANDLERS ****************************/ | ||
733 | /******************************************************************************/ | ||
734 | |||
735 | /** | 543 | /** |
736 | * Process the new channel notification and add it to the channels in the handle | 544 | * Process the new channel notification and add it to the channels in the handle |
737 | * | 545 | * |
@@ -739,9 +547,10 @@ reconnect (struct GNUNET_CADET_Handle *h) | |||
739 | * @param msg A message with the details of the new incoming channel | 547 | * @param msg A message with the details of the new incoming channel |
740 | */ | 548 | */ |
741 | static void | 549 | static void |
742 | process_channel_created (struct GNUNET_CADET_Handle *h, | 550 | handle_channel_created (void *cls, |
743 | const struct GNUNET_CADET_ChannelCreateMessage *msg) | 551 | const struct GNUNET_CADET_ChannelCreateMessage *msg) |
744 | { | 552 | { |
553 | struct GNUNET_CADET_Handle *h = cls; | ||
745 | struct GNUNET_CADET_Channel *ch; | 554 | struct GNUNET_CADET_Channel *ch; |
746 | struct GNUNET_CADET_Port *port; | 555 | struct GNUNET_CADET_Port *port; |
747 | const struct GNUNET_HashCode *port_number; | 556 | const struct GNUNET_HashCode *port_number; |
@@ -777,15 +586,13 @@ process_channel_created (struct GNUNET_CADET_Handle *h, | |||
777 | } | 586 | } |
778 | else | 587 | else |
779 | { | 588 | { |
780 | struct GNUNET_CADET_ChannelDestroyMessage d_msg; | 589 | struct GNUNET_CADET_ChannelDestroyMessage *d_msg; |
590 | struct GNUNET_MQ_Envelope *env; | ||
781 | 591 | ||
782 | LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n"); | 592 | LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n"); |
783 | 593 | env = GNUNET_MQ_msg (d_msg, GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY); | |
784 | d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY); | 594 | d_msg->channel_id = msg->channel_id; |
785 | d_msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelDestroyMessage)); | 595 | GNUNET_MQ_send (h->mq, env); |
786 | d_msg.channel_id = msg->channel_id; | ||
787 | |||
788 | send_packet (h, &d_msg.header, NULL); | ||
789 | } | 596 | } |
790 | return; | 597 | return; |
791 | } | 598 | } |
@@ -798,9 +605,10 @@ process_channel_created (struct GNUNET_CADET_Handle *h, | |||
798 | * @param msg A message with the details of the channel being destroyed | 605 | * @param msg A message with the details of the channel being destroyed |
799 | */ | 606 | */ |
800 | static void | 607 | static void |
801 | process_channel_destroy (struct GNUNET_CADET_Handle *h, | 608 | handle_channel_destroy (void *cls, |
802 | const struct GNUNET_CADET_ChannelDestroyMessage *msg) | 609 | const struct GNUNET_CADET_ChannelDestroyMessage *msg) |
803 | { | 610 | { |
611 | struct GNUNET_CADET_Handle *h = cls; | ||
804 | struct GNUNET_CADET_Channel *ch; | 612 | struct GNUNET_CADET_Channel *ch; |
805 | CADET_ChannelNumber chid; | 613 | CADET_ChannelNumber chid; |
806 | 614 | ||
@@ -818,42 +626,66 @@ process_channel_destroy (struct GNUNET_CADET_Handle *h, | |||
818 | 626 | ||
819 | 627 | ||
820 | /** | 628 | /** |
629 | * Check that message received from CADET service is well-formed. | ||
630 | * | ||
631 | * @param cls the `struct GNUNET_CADET_Handle` | ||
632 | * @param message the message we got | ||
633 | * @return #GNUNET_OK if the message is well-formed, | ||
634 | * #GNUNET_SYSERR otherwise | ||
635 | */ | ||
636 | static int | ||
637 | check_local_data (void *cls, | ||
638 | const struct GNUNET_CADET_LocalData *message) | ||
639 | { | ||
640 | struct GNUNET_CADET_Handle *h = cls; | ||
641 | struct GNUNET_CADET_Channel *ch; | ||
642 | uint16_t size; | ||
643 | |||
644 | size = ntohs (message->header.size); | ||
645 | if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size) | ||
646 | { | ||
647 | GNUNET_break_op (0); | ||
648 | return GNUNET_SYSERR; | ||
649 | } | ||
650 | |||
651 | ch = retrieve_channel (h, ntohl (message->id)); | ||
652 | if (NULL == ch) | ||
653 | { | ||
654 | GNUNET_break_op (0); | ||
655 | return GNUNET_SYSERR; | ||
656 | } | ||
657 | |||
658 | return GNUNET_OK; | ||
659 | } | ||
660 | |||
661 | |||
662 | /** | ||
821 | * Process the incoming data packets, call appropriate handlers. | 663 | * Process the incoming data packets, call appropriate handlers. |
822 | * | 664 | * |
823 | * @param h The cadet handle | 665 | * @param h The cadet handle |
824 | * @param message A message encapsulating the data | 666 | * @param message A message encapsulating the data |
825 | */ | 667 | */ |
826 | static void | 668 | static void |
827 | process_incoming_data (struct GNUNET_CADET_Handle *h, | 669 | handle_local_data (void *cls, |
828 | const struct GNUNET_MessageHeader *message) | 670 | const struct GNUNET_CADET_LocalData *message) |
829 | { | 671 | { |
672 | struct GNUNET_CADET_Handle *h = cls; | ||
830 | const struct GNUNET_MessageHeader *payload; | 673 | const struct GNUNET_MessageHeader *payload; |
831 | const struct GNUNET_CADET_MessageHandler *handler; | 674 | const struct GNUNET_CADET_MessageHandler *handler; |
832 | struct GNUNET_CADET_LocalData *dmsg; | ||
833 | struct GNUNET_CADET_Channel *ch; | 675 | struct GNUNET_CADET_Channel *ch; |
834 | size_t size; | ||
835 | unsigned int i; | 676 | unsigned int i; |
836 | uint16_t type; | 677 | uint16_t type; |
837 | 678 | ||
838 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n"); | 679 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n"); |
839 | dmsg = (struct GNUNET_CADET_LocalData *) message; | 680 | ch = retrieve_channel (h, ntohl (message->id)); |
840 | ch = retrieve_channel (h, ntohl (dmsg->id)); | 681 | GNUNET_assert (NULL != ch); |
841 | if (NULL == ch) | ||
842 | { | ||
843 | GNUNET_break (0); | ||
844 | return; | ||
845 | } | ||
846 | 682 | ||
847 | payload = (struct GNUNET_MessageHeader *) &dmsg[1]; | 683 | payload = (struct GNUNET_MessageHeader *) &message[1]; |
848 | LOG (GNUNET_ERROR_TYPE_DEBUG, " %s data on channel %s [%X]\n", | 684 | LOG (GNUNET_ERROR_TYPE_DEBUG, " %s data on channel %s [%X]\n", |
849 | GC_f2s (ch->chid >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV), | 685 | GC_f2s (ch->chid >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV), |
850 | GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (dmsg->id)); | 686 | GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (message->id)); |
851 | |||
852 | size = ntohs (message->size); | ||
853 | LOG (GNUNET_ERROR_TYPE_DEBUG, " %u bytes\n", size); | ||
854 | 687 | ||
855 | type = ntohs (payload->type); | 688 | type = ntohs (payload->type); |
856 | size = ntohs (payload->size); | ||
857 | LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type)); | 689 | LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type)); |
858 | for (i = 0; i < h->n_handlers; i++) | 690 | for (i = 0; i < h->n_handlers; i++) |
859 | { | 691 | { |
@@ -867,13 +699,13 @@ process_incoming_data (struct GNUNET_CADET_Handle *h, | |||
867 | { | 699 | { |
868 | LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n"); | 700 | LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n"); |
869 | GNUNET_CADET_channel_destroy (ch); | 701 | GNUNET_CADET_channel_destroy (ch); |
870 | return; | 702 | break; |
871 | } | 703 | } |
872 | else | 704 | else |
873 | { | 705 | { |
874 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 706 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
875 | "callback completed successfully\n"); | 707 | "callback completed successfully\n"); |
876 | return; | 708 | break; |
877 | } | 709 | } |
878 | } | 710 | } |
879 | } | 711 | } |
@@ -888,16 +720,15 @@ process_incoming_data (struct GNUNET_CADET_Handle *h, | |||
888 | * @param message Message itself. | 720 | * @param message Message itself. |
889 | */ | 721 | */ |
890 | static void | 722 | static void |
891 | process_ack (struct GNUNET_CADET_Handle *h, | 723 | handle_local_ack (void *cls, |
892 | const struct GNUNET_MessageHeader *message) | 724 | const struct GNUNET_CADET_LocalAck *message) |
893 | { | 725 | { |
894 | struct GNUNET_CADET_LocalAck *msg; | 726 | struct GNUNET_CADET_Handle *h = cls; |
895 | struct GNUNET_CADET_Channel *ch; | 727 | struct GNUNET_CADET_Channel *ch; |
896 | CADET_ChannelNumber chid; | 728 | CADET_ChannelNumber chid; |
897 | 729 | ||
898 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n"); | 730 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n"); |
899 | msg = (struct GNUNET_CADET_LocalAck *) message; | 731 | chid = ntohl (message->channel_id); |
900 | chid = ntohl (msg->channel_id); | ||
901 | ch = retrieve_channel (h, chid); | 732 | ch = retrieve_channel (h, chid); |
902 | if (NULL == ch) | 733 | if (NULL == ch) |
903 | { | 734 | { |
@@ -906,13 +737,162 @@ process_ack (struct GNUNET_CADET_Handle *h, | |||
906 | } | 737 | } |
907 | LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X!\n", ch->chid); | 738 | LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X!\n", ch->chid); |
908 | ch->allow_send = GNUNET_YES; | 739 | ch->allow_send = GNUNET_YES; |
909 | if (NULL == h->th && 0 < ch->packet_size) | 740 | if (0 < ch->packet_size) |
741 | { | ||
742 | struct GNUNET_CADET_TransmitHandle *th; | ||
743 | struct GNUNET_CADET_TransmitHandle *next; | ||
744 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
745 | " pending data, sending %U bytes!\n", | ||
746 | ch->packet_size); | ||
747 | for (th = h->th_head; NULL != th; th = next) | ||
748 | { | ||
749 | next = th->next; | ||
750 | if (th->channel == ch) | ||
751 | { | ||
752 | th->request_data_task = GNUNET_SCHEDULER_add_now (&request_data, th); | ||
753 | GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th); | ||
754 | break; | ||
755 | } | ||
756 | } | ||
757 | /* Complain if we got thru all th without sending anything, ch was wrong */ | ||
758 | GNUNET_break (NULL != th); | ||
759 | } | ||
760 | } | ||
761 | |||
762 | /** | ||
763 | * Reconnect to the service, retransmit all infomation to try to restore the | ||
764 | * original state. | ||
765 | * | ||
766 | * @param h handle to the cadet | ||
767 | * | ||
768 | * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...) | ||
769 | */ | ||
770 | static void | ||
771 | reconnect (struct GNUNET_CADET_Handle *h); | ||
772 | |||
773 | |||
774 | /** | ||
775 | * Reconnect callback: tries to reconnect again after a failer previous | ||
776 | * reconnection. | ||
777 | * | ||
778 | * @param cls closure (cadet handle) | ||
779 | */ | ||
780 | static void | ||
781 | reconnect_cbk (void *cls); | ||
782 | |||
783 | |||
784 | /** | ||
785 | * Generic error handler, called with the appropriate error code and | ||
786 | * the same closure specified at the creation of the message queue. | ||
787 | * Not every message queue implementation supports an error handler. | ||
788 | * | ||
789 | * @param cls closure, a `struct GNUNET_CORE_Handle *` | ||
790 | * @param error error code | ||
791 | */ | ||
792 | static void | ||
793 | handle_mq_error (void *cls, | ||
794 | enum GNUNET_MQ_Error error) | ||
795 | { | ||
796 | struct GNUNET_CADET_Handle *h = cls; | ||
797 | |||
798 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error); | ||
799 | GNUNET_MQ_destroy (h->mq); | ||
800 | h->mq = NULL; | ||
801 | reconnect (h); | ||
802 | } | ||
803 | |||
804 | |||
805 | /** | ||
806 | * Reconnect to the service, retransmit all infomation to try to restore the | ||
807 | * original state. | ||
808 | * | ||
809 | * @param h handle to the cadet | ||
810 | * | ||
811 | * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...) | ||
812 | */ | ||
813 | static int | ||
814 | do_reconnect (struct GNUNET_CADET_Handle *h) | ||
815 | { | ||
816 | GNUNET_MQ_hd_fixed_size (channel_created, | ||
817 | GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE, | ||
818 | struct GNUNET_CADET_ChannelCreateMessage); | ||
819 | GNUNET_MQ_hd_fixed_size (channel_destroy, | ||
820 | GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY, | ||
821 | struct GNUNET_CADET_ChannelDestroyMessage); | ||
822 | GNUNET_MQ_hd_var_size (local_data, | ||
823 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, | ||
824 | struct GNUNET_CADET_LocalData); | ||
825 | GNUNET_MQ_hd_fixed_size (local_ack, | ||
826 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK, | ||
827 | struct GNUNET_CADET_LocalAck); | ||
828 | // FIXME | ||
829 | // GNUNET_MQ_hd_fixed_Y size (channel_destroyed, | ||
830 | // GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK, | ||
831 | // struct GNUNET_CADET_ChannelDestroyMessage); | ||
832 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
833 | make_channel_created_handler (h), | ||
834 | make_channel_destroy_handler (h), | ||
835 | make_local_data_handler (h), | ||
836 | make_local_ack_handler (h), | ||
837 | GNUNET_MQ_handler_end () | ||
838 | }; | ||
839 | |||
840 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n"); | ||
841 | |||
842 | GNUNET_assert (NULL == h->mq); | ||
843 | h->mq = GNUNET_CLIENT_connecT (h->cfg, | ||
844 | "cadet", | ||
845 | handlers, | ||
846 | &handle_mq_error, | ||
847 | h); | ||
848 | if (NULL == h->mq) | ||
910 | { | 849 | { |
911 | LOG (GNUNET_ERROR_TYPE_DEBUG, " tmt rdy was NULL, requesting!\n"); | 850 | reconnect (h); |
912 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, ch->packet_size, | 851 | return GNUNET_NO; |
913 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
914 | GNUNET_YES, &send_callback, h); | ||
915 | } | 852 | } |
853 | else | ||
854 | { | ||
855 | h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; | ||
856 | } | ||
857 | return GNUNET_YES; | ||
858 | } | ||
859 | |||
860 | /** | ||
861 | * Reconnect callback: tries to reconnect again after a failer previous | ||
862 | * reconnecttion | ||
863 | * | ||
864 | * @param cls closure (cadet handle) | ||
865 | */ | ||
866 | static void | ||
867 | reconnect_cbk (void *cls) | ||
868 | { | ||
869 | struct GNUNET_CADET_Handle *h = cls; | ||
870 | |||
871 | h->reconnect_task = NULL; | ||
872 | do_reconnect (h); | ||
873 | } | ||
874 | |||
875 | |||
876 | /** | ||
877 | * Reconnect to the service, retransmit all infomation to try to restore the | ||
878 | * original state. | ||
879 | * | ||
880 | * @param h handle to the cadet | ||
881 | * | ||
882 | * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...) | ||
883 | */ | ||
884 | static void | ||
885 | reconnect (struct GNUNET_CADET_Handle *h) | ||
886 | { | ||
887 | struct GNUNET_CADET_Channel *ch; | ||
888 | |||
889 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
890 | "Requested RECONNECT, destroying all channels\n"); | ||
891 | for (ch = h->channels_head; NULL != ch; ch = h->channels_head) | ||
892 | destroy_channel (ch, GNUNET_YES); | ||
893 | if (NULL == h->reconnect_task) | ||
894 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, | ||
895 | &reconnect_cbk, h); | ||
916 | } | 896 | } |
917 | 897 | ||
918 | 898 | ||
@@ -1237,253 +1217,74 @@ clean_cls: | |||
1237 | } | 1217 | } |
1238 | 1218 | ||
1239 | 1219 | ||
1240 | /** | 1220 | // FIXME: add monitor messages to mq |
1241 | * Function to process all messages received from the service | 1221 | // static void |
1242 | * | 1222 | // msg_received (void *cls, const struct GNUNET_MessageHeader *msg) |
1243 | * @param cls closure | 1223 | // { |
1244 | * @param msg message received, NULL on timeout or fatal error | 1224 | // struct GNUNET_CADET_Handle *h = cls; |
1245 | */ | 1225 | // uint16_t type; |
1246 | static void | 1226 | // |
1247 | msg_received (void *cls, const struct GNUNET_MessageHeader *msg) | 1227 | // if (msg == NULL) |
1248 | { | 1228 | // { |
1249 | struct GNUNET_CADET_Handle *h = cls; | 1229 | // LOG (GNUNET_ERROR_TYPE_DEBUG, |
1250 | uint16_t type; | 1230 | // "Cadet service disconnected, reconnecting\n", h); |
1251 | 1231 | // reconnect (h); | |
1252 | if (msg == NULL) | 1232 | // return; |
1253 | { | 1233 | // } |
1254 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1234 | // type = ntohs (msg->type); |
1255 | "Cadet service disconnected, reconnecting\n", h); | 1235 | // LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); |
1256 | reconnect (h); | 1236 | // LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n", |
1257 | return; | 1237 | // GC_m2s (type)); |
1258 | } | 1238 | // switch (type) |
1259 | type = ntohs (msg->type); | 1239 | // { |
1260 | LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); | 1240 | // /* Notify of a new incoming channel */ |
1261 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n", | 1241 | // case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: |
1262 | GC_m2s (type)); | 1242 | // // process_channel_created (h, |
1263 | switch (type) | 1243 | // // (struct GNUNET_CADET_ChannelCreateMessage *) msg); |
1264 | { | ||
1265 | /* Notify of a new incoming channel */ | ||
1266 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: | ||
1267 | process_channel_created (h, | ||
1268 | (struct GNUNET_CADET_ChannelCreateMessage *) msg); | ||
1269 | break; | ||
1270 | /* Notify of a channel disconnection */ | ||
1271 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: /* TODO separate(gid problem)*/ | ||
1272 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK: | ||
1273 | process_channel_destroy (h, | ||
1274 | (struct GNUNET_CADET_ChannelDestroyMessage *) msg); | ||
1275 | break; | ||
1276 | case GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA: | ||
1277 | process_incoming_data (h, msg); | ||
1278 | break; | ||
1279 | case GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK: | ||
1280 | process_ack (h, msg); | ||
1281 | break; | ||
1282 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS: | ||
1283 | // process_get_channels (h, msg); | ||
1284 | // break; | 1244 | // break; |
1285 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL: | 1245 | // /* Notify of a channel disconnection */ |
1286 | // process_show_channel (h, msg); | 1246 | // case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: /* TODO separate(gid problem)*/ |
1247 | // case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK: | ||
1248 | // // process_channel_destroy (h, | ||
1249 | // // (struct GNUNET_CADET_ChannelDestroyMessage *) msg); | ||
1287 | // break; | 1250 | // break; |
1288 | case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS: | 1251 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA: |
1289 | process_get_peers (h, msg); | 1252 | // // process_incoming_data (h, msg); |
1290 | break; | ||
1291 | case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER: | ||
1292 | process_get_peer (h, msg); | ||
1293 | break; | ||
1294 | case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS: | ||
1295 | process_get_tunnels (h, msg); | ||
1296 | break; | ||
1297 | case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL: | ||
1298 | process_get_tunnel (h, msg); | ||
1299 | break; | ||
1300 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL: | ||
1301 | // process_show_channel (h, msg); | ||
1302 | // break; | 1253 | // break; |
1303 | default: | 1254 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK: |
1304 | /* We shouldn't get any other packages, log and ignore */ | 1255 | // // process_ack (h, msg); |
1305 | LOG (GNUNET_ERROR_TYPE_WARNING, | 1256 | // break; |
1306 | "unsolicited message form service (type %s)\n", | 1257 | // // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS: |
1307 | GC_m2s (ntohs (msg->type))); | 1258 | // // process_get_channels (h, msg); |
1308 | } | 1259 | // // break; |
1309 | LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n"); | 1260 | // // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL: |
1310 | if (GNUNET_YES == h->in_receive) | 1261 | // // process_show_channel (h, msg); |
1311 | { | 1262 | // // break; |
1312 | GNUNET_CLIENT_receive (h->client, &msg_received, h, | 1263 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS: |
1313 | GNUNET_TIME_UNIT_FOREVER_REL); | 1264 | // process_get_peers (h, msg); |
1314 | } | 1265 | // break; |
1315 | else | 1266 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER: |
1316 | { | 1267 | // process_get_peer (h, msg); |
1317 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1268 | // break; |
1318 | "in receive off, not calling CLIENT_receive\n"); | 1269 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS: |
1319 | } | 1270 | // process_get_tunnels (h, msg); |
1320 | } | 1271 | // break; |
1321 | 1272 | // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL: | |
1322 | 1273 | // process_get_tunnel (h, msg); | |
1323 | /******************************************************************************/ | 1274 | // break; |
1324 | /************************ SEND FUNCTIONS ****************************/ | 1275 | // // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL: |
1325 | /******************************************************************************/ | 1276 | // // process_show_channel (h, msg); |
1326 | 1277 | // // break; | |
1327 | /** | 1278 | // default: |
1328 | * Function called to send a message to the service. | 1279 | // /* We shouldn't get any other packages, log and ignore */ |
1329 | * "buf" will be NULL and "size" zero if the socket was closed for writing in | 1280 | // LOG (GNUNET_ERROR_TYPE_WARNING, |
1330 | * the meantime. | 1281 | // "unsolicited message form service (type %s)\n", |
1331 | * | 1282 | // GC_m2s (ntohs (msg->type))); |
1332 | * @param cls closure, the cadet handle | 1283 | // } |
1333 | * @param size number of bytes available in buf | 1284 | // LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n"); |
1334 | * @param buf where the callee should write the connect message | 1285 | // GNUNET_CLIENT_receive (h->client, &msg_received, h, |
1335 | * @return number of bytes written to buf | 1286 | // GNUNET_TIME_UNIT_FOREVER_REL); |
1336 | */ | 1287 | // } |
1337 | static size_t | ||
1338 | send_callback (void *cls, size_t size, void *buf) | ||
1339 | { | ||
1340 | struct GNUNET_CADET_Handle *h = cls; | ||
1341 | struct GNUNET_CADET_TransmitHandle *th; | ||
1342 | struct GNUNET_CADET_TransmitHandle *next; | ||
1343 | struct GNUNET_CADET_Channel *ch; | ||
1344 | char *cbuf = buf; | ||
1345 | size_t tsize; | ||
1346 | size_t psize; | ||
1347 | size_t nsize; | ||
1348 | |||
1349 | LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); | ||
1350 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send callback, buffer %u\n", size); | ||
1351 | if ((0 == size) || (NULL == buf)) | ||
1352 | { | ||
1353 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h); | ||
1354 | reconnect (h); | ||
1355 | h->th = NULL; | ||
1356 | return 0; | ||
1357 | } | ||
1358 | tsize = 0; | ||
1359 | next = h->th_head; | ||
1360 | nsize = message_ready_size (h); | ||
1361 | while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize)) | ||
1362 | { | ||
1363 | ch = th->channel; | ||
1364 | if (GNUNET_YES == th_is_payload (th)) | ||
1365 | { | ||
1366 | struct GNUNET_CADET_LocalData *dmsg; | ||
1367 | struct GNUNET_MessageHeader *mh; | ||
1368 | |||
1369 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# payload, %u bytes on %X (%p)\n", | ||
1370 | th->size, ch->chid, ch); | ||
1371 | if (GNUNET_NO == ch->allow_send) | ||
1372 | { | ||
1373 | /* This channel is not ready to transmit yet, Try the next message */ | ||
1374 | next = th->next; | ||
1375 | continue; | ||
1376 | } | ||
1377 | ch->packet_size = 0; | ||
1378 | GNUNET_assert (size >= th->size); | ||
1379 | dmsg = (struct GNUNET_CADET_LocalData *) cbuf; | ||
1380 | mh = (struct GNUNET_MessageHeader *) &dmsg[1]; | ||
1381 | psize = th->notify (th->notify_cls, size - DATA_OVERHEAD, mh); | ||
1382 | |||
1383 | if (psize > 0) | ||
1384 | { | ||
1385 | GNUNET_assert (sizeof (struct GNUNET_MessageHeader) <= psize); | ||
1386 | psize += DATA_OVERHEAD; | ||
1387 | GNUNET_assert (size >= psize); | ||
1388 | dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); | ||
1389 | dmsg->header.size = htons (psize); | ||
1390 | dmsg->id = htonl (ch->chid); | ||
1391 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# sending, type %s\n", | ||
1392 | GC_m2s (ntohs (mh->type))); | ||
1393 | ch->allow_send = GNUNET_NO; | ||
1394 | } | ||
1395 | else | ||
1396 | { | ||
1397 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1398 | "# callback returned size 0, " | ||
1399 | "application canceled transmission\n"); | ||
1400 | } | ||
1401 | } | ||
1402 | else | ||
1403 | { | ||
1404 | const struct GNUNET_MessageHeader *mh; | ||
1405 | |||
1406 | mh = (const struct GNUNET_MessageHeader *) &th[1]; | ||
1407 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# cadet internal traffic, type %s\n", | ||
1408 | GC_m2s (ntohs (mh->type))); | ||
1409 | GNUNET_memcpy (cbuf, &th[1], th->size); | ||
1410 | psize = th->size; | ||
1411 | } | ||
1412 | GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= psize); | ||
1413 | if (th->timeout_task != NULL) | ||
1414 | GNUNET_SCHEDULER_cancel (th->timeout_task); | ||
1415 | next = th->next; | ||
1416 | GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th); | ||
1417 | GNUNET_free (th); | ||
1418 | nsize = message_ready_size (h); | ||
1419 | cbuf += psize; | ||
1420 | size -= psize; | ||
1421 | tsize += psize; | ||
1422 | } | ||
1423 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# total size: %u\n", tsize); | ||
1424 | h->th = NULL; | ||
1425 | size = message_ready_size (h); | ||
1426 | if (0 != size) | ||
1427 | { | ||
1428 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# next size: %u\n", size); | ||
1429 | h->th = | ||
1430 | GNUNET_CLIENT_notify_transmit_ready (h->client, size, | ||
1431 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1432 | GNUNET_YES, &send_callback, h); | ||
1433 | } | ||
1434 | else | ||
1435 | { | ||
1436 | if (NULL != h->th_head) | ||
1437 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# nothing ready to transmit\n"); | ||
1438 | else | ||
1439 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# nothing left to transmit\n"); | ||
1440 | } | ||
1441 | if (GNUNET_NO == h->in_receive) | ||
1442 | { | ||
1443 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n"); | ||
1444 | h->in_receive = GNUNET_YES; | ||
1445 | GNUNET_CLIENT_receive (h->client, &msg_received, h, | ||
1446 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1447 | } | ||
1448 | LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send callback() END\n"); | ||
1449 | return tsize; | ||
1450 | } | ||
1451 | |||
1452 | |||
1453 | /** | ||
1454 | * Auxiliary function to send an already constructed packet to the service. | ||
1455 | * Takes care of creating a new queue element, copying the message and | ||
1456 | * calling the tmt_rdy function if necessary. | ||
1457 | * | ||
1458 | * @param h cadet handle | ||
1459 | * @param msg message to transmit | ||
1460 | * @param channel channel this send is related to (NULL if N/A) | ||
1461 | */ | ||
1462 | static void | ||
1463 | send_packet (struct GNUNET_CADET_Handle *h, | ||
1464 | const struct GNUNET_MessageHeader *msg, | ||
1465 | struct GNUNET_CADET_Channel *channel) | ||
1466 | { | ||
1467 | struct GNUNET_CADET_TransmitHandle *th; | ||
1468 | size_t msize; | ||
1469 | |||
1470 | LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n", | ||
1471 | GC_m2s(ntohs(msg->type))); | ||
1472 | msize = ntohs (msg->size); | ||
1473 | th = GNUNET_malloc (sizeof (struct GNUNET_CADET_TransmitHandle) + msize); | ||
1474 | th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; | ||
1475 | th->size = msize; | ||
1476 | th->channel = channel; | ||
1477 | GNUNET_memcpy (&th[1], msg, msize); | ||
1478 | add_to_queue (h, th); | ||
1479 | if (NULL != h->th) | ||
1480 | return; | ||
1481 | LOG (GNUNET_ERROR_TYPE_DEBUG, " calling ntfy tmt rdy for %u bytes\n", msize); | ||
1482 | h->th = | ||
1483 | GNUNET_CLIENT_notify_transmit_ready (h->client, msize, | ||
1484 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1485 | GNUNET_YES, &send_callback, h); | ||
1486 | } | ||
1487 | 1288 | ||
1488 | 1289 | ||
1489 | /******************************************************************************/ | 1290 | /******************************************************************************/ |
@@ -1503,11 +1304,11 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls, | |||
1503 | h->cfg = cfg; | 1304 | h->cfg = cfg; |
1504 | h->cleaner = cleaner; | 1305 | h->cleaner = cleaner; |
1505 | h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES); | 1306 | h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES); |
1506 | h->client = GNUNET_CLIENT_connect ("cadet", cfg); | 1307 | do_reconnect (h); |
1507 | if (h->client == NULL) | 1308 | if (h->mq == NULL) |
1508 | { | 1309 | { |
1509 | GNUNET_break (0); | 1310 | GNUNET_break (0); |
1510 | GNUNET_free (h); | 1311 | GNUNET_CADET_disconnect (h); |
1511 | return NULL; | 1312 | return NULL; |
1512 | } | 1313 | } |
1513 | h->cls = cls; | 1314 | h->cls = cls; |
@@ -1574,19 +1375,13 @@ GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle) | |||
1574 | GC_m2s (ntohs(msg->type))); | 1375 | GC_m2s (ntohs(msg->type))); |
1575 | } | 1376 | } |
1576 | 1377 | ||
1577 | GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th); | 1378 | GNUNET_CADET_notify_transmit_ready_cancel (th); |
1578 | GNUNET_free (th); | ||
1579 | } | 1379 | } |
1580 | 1380 | ||
1581 | if (NULL != handle->th) | 1381 | if (NULL != handle->mq) |
1582 | { | ||
1583 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); | ||
1584 | handle->th = NULL; | ||
1585 | } | ||
1586 | if (NULL != handle->client) | ||
1587 | { | 1382 | { |
1588 | GNUNET_CLIENT_disconnect (handle->client); | 1383 | GNUNET_MQ_destroy (handle->mq); |
1589 | handle->client = NULL; | 1384 | handle->mq = NULL; |
1590 | } | 1385 | } |
1591 | if (NULL != handle->reconnect_task) | 1386 | if (NULL != handle->reconnect_task) |
1592 | { | 1387 | { |
@@ -1617,8 +1412,9 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, | |||
1617 | new_channel, | 1412 | new_channel, |
1618 | void *new_channel_cls) | 1413 | void *new_channel_cls) |
1619 | { | 1414 | { |
1415 | struct GNUNET_CADET_PortMessage *msg; | ||
1416 | struct GNUNET_MQ_Envelope *env; | ||
1620 | struct GNUNET_CADET_Port *p; | 1417 | struct GNUNET_CADET_Port *p; |
1621 | struct GNUNET_CADET_PortMessage msg; | ||
1622 | 1418 | ||
1623 | GNUNET_assert (NULL != new_channel); | 1419 | GNUNET_assert (NULL != new_channel); |
1624 | p = GNUNET_new (struct GNUNET_CADET_Port); | 1420 | p = GNUNET_new (struct GNUNET_CADET_Port); |
@@ -1633,10 +1429,9 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, | |||
1633 | p, | 1429 | p, |
1634 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 1430 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
1635 | 1431 | ||
1636 | msg.header.size = htons (sizeof (msg)); | 1432 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); |
1637 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); | 1433 | msg->port = *p->hash; |
1638 | msg.port = *p->hash; | 1434 | GNUNET_MQ_send (h->mq, env); |
1639 | send_packet (p->cadet, &msg.header, NULL); | ||
1640 | 1435 | ||
1641 | return p; | 1436 | return p; |
1642 | } | 1437 | } |
@@ -1650,12 +1445,13 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, | |||
1650 | void | 1445 | void |
1651 | GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p) | 1446 | GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p) |
1652 | { | 1447 | { |
1653 | struct GNUNET_CADET_PortMessage msg; | 1448 | struct GNUNET_CADET_PortMessage *msg; |
1449 | struct GNUNET_MQ_Envelope *env; | ||
1654 | 1450 | ||
1655 | msg.header.size = htons (sizeof (msg)); | 1451 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE); |
1656 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE); | 1452 | |
1657 | msg.port = *p->hash; | 1453 | msg->port = *p->hash; |
1658 | send_packet (p->cadet, &msg.header, NULL); | 1454 | GNUNET_MQ_send (p->cadet->mq, env); |
1659 | GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, p->hash, p); | 1455 | GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, p->hash, p); |
1660 | GNUNET_free (p->hash); | 1456 | GNUNET_free (p->hash); |
1661 | GNUNET_free (p); | 1457 | GNUNET_free (p); |
@@ -1684,8 +1480,9 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h, | |||
1684 | const struct GNUNET_HashCode *port, | 1480 | const struct GNUNET_HashCode *port, |
1685 | enum GNUNET_CADET_ChannelOption options) | 1481 | enum GNUNET_CADET_ChannelOption options) |
1686 | { | 1482 | { |
1483 | struct GNUNET_CADET_ChannelCreateMessage *msg; | ||
1484 | struct GNUNET_MQ_Envelope *env; | ||
1687 | struct GNUNET_CADET_Channel *ch; | 1485 | struct GNUNET_CADET_Channel *ch; |
1688 | struct GNUNET_CADET_ChannelCreateMessage msg; | ||
1689 | 1486 | ||
1690 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1487 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1691 | "Creating new channel to %s:%u\n", | 1488 | "Creating new channel to %s:%u\n", |
@@ -1696,14 +1493,13 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h, | |||
1696 | ch->ctx = channel_ctx; | 1493 | ch->ctx = channel_ctx; |
1697 | ch->peer = GNUNET_PEER_intern (peer); | 1494 | ch->peer = GNUNET_PEER_intern (peer); |
1698 | 1495 | ||
1699 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE); | 1496 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE); |
1700 | msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelCreateMessage)); | 1497 | msg->channel_id = htonl (ch->chid); |
1701 | msg.channel_id = htonl (ch->chid); | 1498 | msg->port = *port; |
1702 | msg.port = *port; | 1499 | msg->peer = *peer; |
1703 | msg.peer = *peer; | 1500 | msg->opt = htonl (options); |
1704 | msg.opt = htonl (options); | ||
1705 | ch->allow_send = GNUNET_NO; | 1501 | ch->allow_send = GNUNET_NO; |
1706 | send_packet (h, &msg.header, ch); | 1502 | GNUNET_MQ_send (h->mq, env); |
1707 | 1503 | ||
1708 | return ch; | 1504 | return ch; |
1709 | } | 1505 | } |
@@ -1713,39 +1509,40 @@ void | |||
1713 | GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel) | 1509 | GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel) |
1714 | { | 1510 | { |
1715 | struct GNUNET_CADET_Handle *h; | 1511 | struct GNUNET_CADET_Handle *h; |
1716 | struct GNUNET_CADET_ChannelDestroyMessage msg; | 1512 | struct GNUNET_CADET_ChannelDestroyMessage *msg; |
1513 | struct GNUNET_MQ_Envelope *env; | ||
1717 | struct GNUNET_CADET_TransmitHandle *th; | 1514 | struct GNUNET_CADET_TransmitHandle *th; |
1515 | struct GNUNET_CADET_TransmitHandle *next; | ||
1718 | 1516 | ||
1719 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n"); | 1517 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n"); |
1720 | h = channel->cadet; | 1518 | h = channel->cadet; |
1721 | 1519 | for (th = h->th_head; th != NULL; th = next) | |
1722 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY); | ||
1723 | msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelDestroyMessage)); | ||
1724 | msg.channel_id = htonl (channel->chid); | ||
1725 | th = h->th_head; | ||
1726 | while (th != NULL) | ||
1727 | { | 1520 | { |
1728 | struct GNUNET_CADET_TransmitHandle *aux; | 1521 | next = th->next; |
1729 | if (th->channel == channel) | 1522 | if (th->channel == channel) |
1730 | { | 1523 | { |
1731 | aux = th->next; | 1524 | GNUNET_break (0); |
1732 | if (GNUNET_YES == th_is_payload (th)) | 1525 | if (GNUNET_YES == th_is_payload (th)) |
1733 | { | 1526 | { |
1734 | /* applications should cancel before destroying channel */ | 1527 | /* applications should cancel before destroying channel */ |
1735 | GNUNET_break (0); | ||
1736 | LOG (GNUNET_ERROR_TYPE_WARNING, | 1528 | LOG (GNUNET_ERROR_TYPE_WARNING, |
1737 | "Channel destroyed without cancelling transmission requests\n"); | 1529 | "Channel destroyed without cancelling transmission requests\n"); |
1738 | th->notify (th->notify_cls, 0, NULL); | 1530 | th->notify (th->notify_cls, 0, NULL); |
1739 | } | 1531 | } |
1532 | else | ||
1533 | { | ||
1534 | LOG (GNUNET_ERROR_TYPE_WARNING, "no meta-traffic should be queued\n"); | ||
1535 | } | ||
1536 | GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th); | ||
1740 | GNUNET_CADET_notify_transmit_ready_cancel (th); | 1537 | GNUNET_CADET_notify_transmit_ready_cancel (th); |
1741 | th = aux; | ||
1742 | } | 1538 | } |
1743 | else | ||
1744 | th = th->next; | ||
1745 | } | 1539 | } |
1746 | 1540 | ||
1541 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY); | ||
1542 | msg->channel_id = htonl (channel->chid); | ||
1543 | GNUNET_MQ_send (h->mq, env); | ||
1544 | |||
1747 | destroy_channel (channel, GNUNET_YES); | 1545 | destroy_channel (channel, GNUNET_YES); |
1748 | send_packet (h, &msg.header, NULL); | ||
1749 | } | 1546 | } |
1750 | 1547 | ||
1751 | 1548 | ||
@@ -1787,8 +1584,10 @@ GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel, | |||
1787 | return ret; | 1584 | return ret; |
1788 | } | 1585 | } |
1789 | 1586 | ||
1587 | |||
1790 | struct GNUNET_CADET_TransmitHandle * | 1588 | struct GNUNET_CADET_TransmitHandle * |
1791 | GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, int cork, | 1589 | GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, |
1590 | int cork, | ||
1792 | struct GNUNET_TIME_Relative maxdelay, | 1591 | struct GNUNET_TIME_Relative maxdelay, |
1793 | size_t notify_size, | 1592 | size_t notify_size, |
1794 | GNUNET_CONNECTION_TransmitReadyNotify notify, | 1593 | GNUNET_CONNECTION_TransmitReadyNotify notify, |
@@ -1808,25 +1607,25 @@ GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, int co | |||
1808 | LOG (GNUNET_ERROR_TYPE_DEBUG, " payload size %u\n", notify_size); | 1607 | LOG (GNUNET_ERROR_TYPE_DEBUG, " payload size %u\n", notify_size); |
1809 | GNUNET_assert (NULL != notify); | 1608 | GNUNET_assert (NULL != notify); |
1810 | GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed | 1609 | GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed |
1610 | |||
1611 | if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us) | ||
1612 | { | ||
1613 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1614 | "CADET transmit ready timeout is deprected (has no effect)\n"); | ||
1615 | } | ||
1616 | |||
1811 | th = GNUNET_new (struct GNUNET_CADET_TransmitHandle); | 1617 | th = GNUNET_new (struct GNUNET_CADET_TransmitHandle); |
1812 | th->channel = channel; | 1618 | th->channel = channel; |
1813 | th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); | 1619 | th->size = notify_size; |
1814 | th->size = notify_size + DATA_OVERHEAD; | ||
1815 | channel->packet_size = th->size; | 1620 | channel->packet_size = th->size; |
1816 | LOG (GNUNET_ERROR_TYPE_DEBUG, " total size %u\n", th->size); | 1621 | LOG (GNUNET_ERROR_TYPE_DEBUG, " total size %u\n", th->size); |
1817 | th->notify = notify; | 1622 | th->notify = notify; |
1818 | th->notify_cls = notify_cls; | 1623 | th->notify_cls = notify_cls; |
1819 | add_to_queue (channel->cadet, th); | 1624 | if (GNUNET_YES == channel->allow_send) |
1820 | if (NULL != channel->cadet->th) | 1625 | th->request_data_task = GNUNET_SCHEDULER_add_now (&request_data, th); |
1821 | return th; | 1626 | else |
1822 | if (GNUNET_NO == channel->allow_send) | 1627 | add_to_queue (channel->cadet, th); |
1823 | return th; | 1628 | |
1824 | LOG (GNUNET_ERROR_TYPE_DEBUG, " call client notify tmt rdy\n"); | ||
1825 | channel->cadet->th = | ||
1826 | GNUNET_CLIENT_notify_transmit_ready (channel->cadet->client, th->size, | ||
1827 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1828 | GNUNET_YES, &send_callback, | ||
1829 | channel->cadet); | ||
1830 | LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY END\n"); | 1629 | LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY END\n"); |
1831 | return th; | 1630 | return th; |
1832 | } | 1631 | } |
@@ -1835,25 +1634,20 @@ GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, int co | |||
1835 | void | 1634 | void |
1836 | GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th) | 1635 | GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th) |
1837 | { | 1636 | { |
1838 | struct GNUNET_CADET_Handle *cadet; | 1637 | if (NULL != th->request_data_task) |
1638 | { | ||
1639 | GNUNET_SCHEDULER_cancel (th->request_data_task); | ||
1640 | } | ||
1641 | th->request_data_task = NULL; | ||
1839 | 1642 | ||
1840 | LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY CANCEL\n"); | 1643 | /* It might or might not have been queued (rarely not), but check anyway. */ |
1841 | LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X (%p)\n", | 1644 | if (NULL != th->next) |
1842 | th->channel->chid, th->channel); | ||
1843 | LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u bytes\n", th->size); | ||
1844 | th->channel->packet_size = 0; | ||
1845 | cadet = th->channel->cadet; | ||
1846 | if (th->timeout_task != NULL) | ||
1847 | GNUNET_SCHEDULER_cancel (th->timeout_task); | ||
1848 | GNUNET_CONTAINER_DLL_remove (cadet->th_head, cadet->th_tail, th); | ||
1849 | GNUNET_free (th); | ||
1850 | if ((0 == message_ready_size (cadet)) && (NULL != cadet->th)) | ||
1851 | { | 1645 | { |
1852 | /* queue empty, no point in asking for transmission */ | 1646 | struct GNUNET_CADET_Handle *h; |
1853 | GNUNET_CLIENT_notify_transmit_ready_cancel (cadet->th); | 1647 | h = th->channel->cadet; |
1854 | cadet->th = NULL; | 1648 | GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th); |
1855 | } | 1649 | } |
1856 | LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY CANCEL END\n"); | 1650 | GNUNET_free (th); |
1857 | } | 1651 | } |
1858 | 1652 | ||
1859 | 1653 | ||
@@ -1867,11 +1661,15 @@ GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel) | |||
1867 | static void | 1661 | static void |
1868 | send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type) | 1662 | send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type) |
1869 | { | 1663 | { |
1870 | struct GNUNET_MessageHeader msg; | 1664 | struct GNUNET_MessageHeader *msg; |
1665 | struct GNUNET_MQ_Envelope *env; | ||
1666 | |||
1667 | env = GNUNET_MQ_msg (msg, type); | ||
1668 | GNUNET_MQ_send (h->mq, env); | ||
1871 | 1669 | ||
1872 | msg.size = htons (sizeof (msg)); | 1670 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1873 | msg.type = htons (type); | 1671 | " Sending %s message to service\n", |
1874 | send_packet (h, &msg, NULL); | 1672 | GC_m2s(type)); |
1875 | } | 1673 | } |
1876 | 1674 | ||
1877 | 1675 | ||
@@ -1958,11 +1756,12 @@ GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h) | |||
1958 | */ | 1756 | */ |
1959 | int | 1757 | int |
1960 | GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h, | 1758 | GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h, |
1961 | const struct GNUNET_PeerIdentity *id, | 1759 | const struct GNUNET_PeerIdentity *id, |
1962 | GNUNET_CADET_PeerCB callback, | 1760 | GNUNET_CADET_PeerCB callback, |
1963 | void *callback_cls) | 1761 | void *callback_cls) |
1964 | { | 1762 | { |
1965 | struct GNUNET_CADET_LocalInfo msg; | 1763 | struct GNUNET_CADET_LocalInfo *msg; |
1764 | struct GNUNET_MQ_Envelope *env; | ||
1966 | 1765 | ||
1967 | if (NULL != h->info_cb.peer_cb) | 1766 | if (NULL != h->info_cb.peer_cb) |
1968 | { | 1767 | { |
@@ -1970,11 +1769,10 @@ GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h, | |||
1970 | return GNUNET_SYSERR; | 1769 | return GNUNET_SYSERR; |
1971 | } | 1770 | } |
1972 | 1771 | ||
1973 | memset (&msg, 0, sizeof (msg)); | 1772 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER); |
1974 | msg.header.size = htons (sizeof (msg)); | 1773 | msg->peer = *id; |
1975 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER); | 1774 | GNUNET_MQ_send (h->mq, env); |
1976 | msg.peer = *id; | 1775 | |
1977 | send_packet (h, &msg.header, NULL); | ||
1978 | h->info_cb.peer_cb = callback; | 1776 | h->info_cb.peer_cb = callback; |
1979 | h->info_cls = callback_cls; | 1777 | h->info_cls = callback_cls; |
1980 | return GNUNET_OK; | 1778 | return GNUNET_OK; |
@@ -2052,7 +1850,8 @@ GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h, | |||
2052 | GNUNET_CADET_TunnelCB callback, | 1850 | GNUNET_CADET_TunnelCB callback, |
2053 | void *callback_cls) | 1851 | void *callback_cls) |
2054 | { | 1852 | { |
2055 | struct GNUNET_CADET_LocalInfo msg; | 1853 | struct GNUNET_CADET_LocalInfo *msg; |
1854 | struct GNUNET_MQ_Envelope *env; | ||
2056 | 1855 | ||
2057 | if (NULL != h->info_cb.tunnel_cb) | 1856 | if (NULL != h->info_cb.tunnel_cb) |
2058 | { | 1857 | { |
@@ -2060,11 +1859,10 @@ GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h, | |||
2060 | return GNUNET_SYSERR; | 1859 | return GNUNET_SYSERR; |
2061 | } | 1860 | } |
2062 | 1861 | ||
2063 | memset (&msg, 0, sizeof (msg)); | 1862 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL); |
2064 | msg.header.size = htons (sizeof (msg)); | 1863 | msg->peer = *id; |
2065 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL); | 1864 | GNUNET_MQ_send (h->mq, env); |
2066 | msg.peer = *id; | 1865 | |
2067 | send_packet (h, &msg.header, NULL); | ||
2068 | h->info_cb.tunnel_cb = callback; | 1866 | h->info_cb.tunnel_cb = callback; |
2069 | h->info_cls = callback_cls; | 1867 | h->info_cls = callback_cls; |
2070 | return GNUNET_OK; | 1868 | return GNUNET_OK; |
@@ -2087,12 +1885,13 @@ GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h, | |||
2087 | */ | 1885 | */ |
2088 | int | 1886 | int |
2089 | GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h, | 1887 | GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h, |
2090 | struct GNUNET_PeerIdentity *initiator, | 1888 | struct GNUNET_PeerIdentity *initiator, |
2091 | unsigned int channel_number, | 1889 | unsigned int channel_number, |
2092 | GNUNET_CADET_ChannelCB callback, | 1890 | GNUNET_CADET_ChannelCB callback, |
2093 | void *callback_cls) | 1891 | void *callback_cls) |
2094 | { | 1892 | { |
2095 | struct GNUNET_CADET_LocalInfo msg; | 1893 | struct GNUNET_CADET_LocalInfo *msg; |
1894 | struct GNUNET_MQ_Envelope *env; | ||
2096 | 1895 | ||
2097 | if (NULL != h->info_cb.channel_cb) | 1896 | if (NULL != h->info_cb.channel_cb) |
2098 | { | 1897 | { |
@@ -2100,12 +1899,11 @@ GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h, | |||
2100 | return GNUNET_SYSERR; | 1899 | return GNUNET_SYSERR; |
2101 | } | 1900 | } |
2102 | 1901 | ||
2103 | msg.header.size = htons (sizeof (msg)); | 1902 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL); |
2104 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL); | 1903 | msg->peer = *initiator; |
2105 | msg.peer = *initiator; | 1904 | msg->channel_id = htonl (channel_number); |
2106 | msg.channel_id = htonl (channel_number); | 1905 | GNUNET_MQ_send (h->mq, env); |
2107 | // msg.reserved = 0; | 1906 | |
2108 | send_packet (h, &msg.header, NULL); | ||
2109 | h->info_cb.channel_cb = callback; | 1907 | h->info_cb.channel_cb = callback; |
2110 | h->info_cls = callback_cls; | 1908 | h->info_cls = callback_cls; |
2111 | return GNUNET_OK; | 1909 | return GNUNET_OK; |
diff --git a/src/include/gnunet_cadet_service.h b/src/include/gnunet_cadet_service.h index 1c440fc46..c32311643 100644 --- a/src/include/gnunet_cadet_service.h +++ b/src/include/gnunet_cadet_service.h | |||
@@ -372,6 +372,10 @@ GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, | |||
372 | /** | 372 | /** |
373 | * Cancel the specified transmission-ready notification. | 373 | * Cancel the specified transmission-ready notification. |
374 | * | 374 | * |
375 | * #DEPRECATED | ||
376 | * Since soon we will send immediately with mq (via request_data), | ||
377 | * there will be time or need to cancel a "pending" transmission. | ||
378 | * | ||
375 | * @param th handle that was returned by "notify_transmit_ready". | 379 | * @param th handle that was returned by "notify_transmit_ready". |
376 | */ | 380 | */ |
377 | void | 381 | void |