diff options
author | Bart Polot <bart.polot+voyager@gmail.com> | 2017-01-31 04:17:58 +0100 |
---|---|---|
committer | Bart Polot <bart.polot+voyager@gmail.com> | 2017-01-31 04:17:58 +0100 |
commit | 18197ec851b3a4f23b96e8ea9a6ba58ae3982bac (patch) | |
tree | 5f503093327381cb0b4ba12668d612c7bf47782a /src/cadet/cadet_api.c | |
parent | ae5354d85a43830da6921a53071edfe3859e21a9 (diff) | |
download | gnunet-18197ec851b3a4f23b96e8ea9a6ba58ae3982bac.tar.gz gnunet-18197ec851b3a4f23b96e8ea9a6ba58ae3982bac.zip |
Implementation of port opening and handling in MQ
Diffstat (limited to 'src/cadet/cadet_api.c')
-rw-r--r-- | src/cadet/cadet_api.c | 301 |
1 files changed, 191 insertions, 110 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index 3491bd75f..3eaa78af8 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c | |||
@@ -280,14 +280,11 @@ struct GNUNET_CADET_Channel | |||
280 | /** | 280 | /** |
281 | * Are we allowed to send to the service? | 281 | * Are we allowed to send to the service? |
282 | * | 282 | * |
283 | * @deprecated | 283 | * @deprecated? |
284 | */ | 284 | */ |
285 | unsigned int allow_send; | 285 | unsigned int allow_send; |
286 | 286 | ||
287 | /****************************************************************************/ | ||
288 | /***************************** MQ ************************************/ | 287 | /***************************** MQ ************************************/ |
289 | /****************************************************************************/ | ||
290 | |||
291 | /** | 288 | /** |
292 | * Message Queue for the channel. | 289 | * Message Queue for the channel. |
293 | */ | 290 | */ |
@@ -330,6 +327,38 @@ struct GNUNET_CADET_Port | |||
330 | * Closure for @a handler. | 327 | * Closure for @a handler. |
331 | */ | 328 | */ |
332 | void *cls; | 329 | void *cls; |
330 | |||
331 | /***************************** MQ ************************************/ | ||
332 | |||
333 | /** | ||
334 | * Port "number" | ||
335 | */ | ||
336 | struct GNUNET_HashCode id; | ||
337 | |||
338 | /** | ||
339 | * Handler for incoming channels on this port | ||
340 | */ | ||
341 | GNUNET_CADET_ConnectEventHandler connects; | ||
342 | |||
343 | /** | ||
344 | * Closure for @ref connects | ||
345 | */ | ||
346 | void * connects_cls; | ||
347 | |||
348 | /** | ||
349 | * Window size change handler. | ||
350 | */ | ||
351 | GNUNET_CADET_WindowSizeEventHandler window_changes; | ||
352 | |||
353 | /** | ||
354 | * Handler called when an incoming channel is destroyed.. | ||
355 | */ | ||
356 | GNUNET_CADET_DisconnectEventHandler disconnects; | ||
357 | |||
358 | /** | ||
359 | * Payload handlers for incoming channels. | ||
360 | */ | ||
361 | const struct GNUNET_MQ_MessageHandler *handlers; | ||
333 | }; | 362 | }; |
334 | 363 | ||
335 | 364 | ||
@@ -553,6 +582,114 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th) | |||
553 | } | 582 | } |
554 | 583 | ||
555 | 584 | ||
585 | /******************************************************************************/ | ||
586 | /*********************** MQ API CALLBACKS ****************************/ | ||
587 | /******************************************************************************/ | ||
588 | |||
589 | |||
590 | /** | ||
591 | * Implement sending functionality of a message queue for | ||
592 | * us sending messages to a peer. | ||
593 | * | ||
594 | * Encapsulates the payload message in a #GNUNET_CADET_LocalData message | ||
595 | * in order to label the message with the channel ID and send the | ||
596 | * encapsulated message to the service. | ||
597 | * | ||
598 | * @param mq the message queue | ||
599 | * @param msg the message to send | ||
600 | * @param impl_state state of the implementation | ||
601 | */ | ||
602 | static void | ||
603 | cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq, | ||
604 | const struct GNUNET_MessageHeader *msg, | ||
605 | void *impl_state) | ||
606 | { | ||
607 | struct GNUNET_CADET_Channel *ch = impl_state; | ||
608 | struct GNUNET_CADET_Handle *h = ch->cadet; | ||
609 | uint16_t msize; | ||
610 | struct GNUNET_MQ_Envelope *env; | ||
611 | struct GNUNET_CADET_LocalData *cadet_msg; | ||
612 | |||
613 | |||
614 | if (NULL == h->mq) | ||
615 | { | ||
616 | /* We're currently reconnecting, pretend this worked */ | ||
617 | GNUNET_MQ_impl_send_continue (mq); | ||
618 | return; | ||
619 | } | ||
620 | |||
621 | /* check message size for sanity */ | ||
622 | msize = ntohs (msg->size); | ||
623 | if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE) | ||
624 | { | ||
625 | GNUNET_break (0); | ||
626 | GNUNET_MQ_impl_send_continue (mq); | ||
627 | return; | ||
628 | } | ||
629 | |||
630 | env = GNUNET_MQ_msg_nested_mh (cadet_msg, | ||
631 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, | ||
632 | msg); | ||
633 | cadet_msg->ccn = ch->ccn; | ||
634 | GNUNET_MQ_send (h->mq, env); | ||
635 | GNUNET_MQ_impl_send_continue (mq); | ||
636 | } | ||
637 | |||
638 | |||
639 | /** | ||
640 | * Handle destruction of a message queue. Implementations must not | ||
641 | * free @a mq, but should take care of @a impl_state. | ||
642 | * | ||
643 | * @param mq the message queue to destroy | ||
644 | * @param impl_state state of the implementation | ||
645 | */ | ||
646 | static void | ||
647 | cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, | ||
648 | void *impl_state) | ||
649 | { | ||
650 | struct GNUNET_CADET_Channel *ch = impl_state; | ||
651 | |||
652 | GNUNET_assert (mq == ch->mq); | ||
653 | ch->mq = NULL; | ||
654 | } | ||
655 | |||
656 | |||
657 | /** | ||
658 | * We had an error processing a message we forwarded from a peer to | ||
659 | * the CADET service. We should just complain about it but otherwise | ||
660 | * continue processing. | ||
661 | * | ||
662 | * @param cls closure | ||
663 | * @param error error code | ||
664 | */ | ||
665 | static void | ||
666 | cadet_mq_error_handler (void *cls, | ||
667 | enum GNUNET_MQ_Error error) | ||
668 | { | ||
669 | GNUNET_break_op (0); | ||
670 | } | ||
671 | |||
672 | |||
673 | /** | ||
674 | * Implementation function that cancels the currently sent message. | ||
675 | * Should basically undo whatever #mq_send_impl() did. | ||
676 | * | ||
677 | * @param mq message queue | ||
678 | * @param impl_state state specific to the implementation | ||
679 | */ | ||
680 | static void | ||
681 | cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, | ||
682 | void *impl_state) | ||
683 | { | ||
684 | struct GNUNET_CADET_Channel *ch = impl_state; | ||
685 | |||
686 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
687 | "Cannot cancel mq message on channel %X of %p\n", | ||
688 | ch->ccn.channel_of_client, ch->cadet); | ||
689 | |||
690 | GNUNET_break (0); | ||
691 | } | ||
692 | |||
556 | 693 | ||
557 | /******************************************************************************/ | 694 | /******************************************************************************/ |
558 | /*********************** RECEIVE HANDLERS ****************************/ | 695 | /*********************** RECEIVE HANDLERS ****************************/ |
@@ -627,15 +764,17 @@ handle_channel_created (void *cls, | |||
627 | port = find_port (h, port_number); | 764 | port = find_port (h, port_number); |
628 | if (NULL == port) | 765 | if (NULL == port) |
629 | { | 766 | { |
767 | /* We could have closed the port but the service didn't know about it yet | ||
768 | * This is not an error. | ||
769 | */ | ||
630 | struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg; | 770 | struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg; |
631 | struct GNUNET_MQ_Envelope *env; | 771 | struct GNUNET_MQ_Envelope *env; |
632 | 772 | ||
633 | GNUNET_break (0); | 773 | GNUNET_break (0); |
634 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 774 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
635 | "No handler for incoming channel %X [%s]\n", | 775 | "No handler for incoming channel %X (on port %s, recently closed?)\n", |
636 | ntohl (ccn.channel_of_client), | 776 | ntohl (ccn.channel_of_client), |
637 | GNUNET_h2s (port_number)); | 777 | GNUNET_h2s (port_number)); |
638 | /* FIXME: should disconnect instead, this is a serious error! */ | ||
639 | env = GNUNET_MQ_msg (d_msg, | 778 | env = GNUNET_MQ_msg (d_msg, |
640 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); | 779 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); |
641 | d_msg->ccn = msg->ccn; | 780 | d_msg->ccn = msg->ccn; |
@@ -651,17 +790,38 @@ handle_channel_created (void *cls, | |||
651 | ch->ccn = ccn; | 790 | ch->ccn = ccn; |
652 | ch->incoming_port = port; | 791 | ch->incoming_port = port; |
653 | ch->options = ntohl (msg->opt); | 792 | ch->options = ntohl (msg->opt); |
654 | |||
655 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 793 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
656 | "Creating incoming channel %X [%s] %p\n", | 794 | "Creating incoming channel %X [%s] %p\n", |
657 | ntohl (ccn.channel_of_client), | 795 | ntohl (ccn.channel_of_client), |
658 | GNUNET_h2s (port_number), | 796 | GNUNET_h2s (port_number), |
659 | ch); | 797 | ch); |
660 | ch->ctx = port->handler (port->cls, | 798 | |
661 | ch, | 799 | if (NULL != port->handler) |
662 | &msg->peer, | 800 | { |
663 | port->hash, | 801 | /** @deprecated */ |
664 | ch->options); | 802 | /* Old style API */ |
803 | ch->ctx = port->handler (port->cls, | ||
804 | ch, | ||
805 | &msg->peer, | ||
806 | port->hash, | ||
807 | ch->options); | ||
808 | } else { | ||
809 | /* MQ API */ | ||
810 | GNUNET_assert (NULL != port->connects); | ||
811 | ch->window_changes = port->window_changes; | ||
812 | ch->disconnects = port->disconnects; | ||
813 | ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl, | ||
814 | &cadet_mq_destroy_impl, | ||
815 | &cadet_mq_cancel_impl, | ||
816 | ch, | ||
817 | port->handlers, | ||
818 | &cadet_mq_error_handler, | ||
819 | ch); | ||
820 | ch->ctx = port->connects (port->cadet->cls, | ||
821 | ch, | ||
822 | &msg->peer); | ||
823 | GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx); | ||
824 | } | ||
665 | } | 825 | } |
666 | 826 | ||
667 | 827 | ||
@@ -2236,111 +2396,32 @@ GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h, | |||
2236 | GNUNET_CADET_DisconnectEventHandler disconnects, | 2396 | GNUNET_CADET_DisconnectEventHandler disconnects, |
2237 | const struct GNUNET_MQ_MessageHandler *handlers) | 2397 | const struct GNUNET_MQ_MessageHandler *handlers) |
2238 | { | 2398 | { |
2239 | return NULL; | 2399 | struct GNUNET_CADET_PortMessage *msg; |
2240 | } | ||
2241 | |||
2242 | |||
2243 | /** | ||
2244 | * Implement sending functionality of a message queue for | ||
2245 | * us sending messages to a peer. | ||
2246 | * | ||
2247 | * Encapsulates the payload message in a #GNUNET_CADET_LocalData message | ||
2248 | * in order to label the message with the channel ID and send the | ||
2249 | * encapsulated message to the service. | ||
2250 | * | ||
2251 | * @param mq the message queue | ||
2252 | * @param msg the message to send | ||
2253 | * @param impl_state state of the implementation | ||
2254 | */ | ||
2255 | static void | ||
2256 | cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq, | ||
2257 | const struct GNUNET_MessageHeader *msg, | ||
2258 | void *impl_state) | ||
2259 | { | ||
2260 | struct GNUNET_CADET_Channel *ch = impl_state; | ||
2261 | struct GNUNET_CADET_Handle *h = ch->cadet; | ||
2262 | uint16_t msize; | ||
2263 | struct GNUNET_MQ_Envelope *env; | 2400 | struct GNUNET_MQ_Envelope *env; |
2264 | struct GNUNET_CADET_LocalData *cadet_msg; | 2401 | struct GNUNET_CADET_Port *p; |
2265 | 2402 | ||
2403 | GNUNET_assert (NULL != connects); | ||
2266 | 2404 | ||
2267 | if (NULL == h->mq) | 2405 | p = GNUNET_new (struct GNUNET_CADET_Port); |
2268 | { | 2406 | p->cadet = h; |
2269 | /* We're currently reconnecting, pretend this worked */ | 2407 | p->id = *port; |
2270 | GNUNET_MQ_impl_send_continue (mq); | 2408 | p->connects = connects; |
2271 | return; | 2409 | p->cls = connects_cls; |
2272 | } | 2410 | p->window_changes = window_changes; |
2411 | p->disconnects = disconnects; | ||
2412 | p->handlers = handlers; | ||
2273 | 2413 | ||
2274 | /* check message size for sanity */ | 2414 | GNUNET_assert (GNUNET_OK == |
2275 | msize = ntohs (msg->size); | 2415 | GNUNET_CONTAINER_multihashmap_put (h->ports, |
2276 | if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE) | 2416 | p->hash, |
2277 | { | 2417 | p, |
2278 | GNUNET_break (0); | 2418 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
2279 | GNUNET_MQ_impl_send_continue (mq); | ||
2280 | return; | ||
2281 | } | ||
2282 | 2419 | ||
2283 | env = GNUNET_MQ_msg_nested_mh (cadet_msg, | 2420 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); |
2284 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, | 2421 | msg->port = p->id; |
2285 | msg); | ||
2286 | cadet_msg->ccn = ch->ccn; | ||
2287 | GNUNET_MQ_send (h->mq, env); | 2422 | GNUNET_MQ_send (h->mq, env); |
2288 | GNUNET_MQ_impl_send_continue (mq); | ||
2289 | } | ||
2290 | |||
2291 | 2423 | ||
2292 | /** | 2424 | return p; |
2293 | * Handle destruction of a message queue. Implementations must not | ||
2294 | * free @a mq, but should take care of @a impl_state. | ||
2295 | * | ||
2296 | * @param mq the message queue to destroy | ||
2297 | * @param impl_state state of the implementation | ||
2298 | */ | ||
2299 | static void | ||
2300 | cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, | ||
2301 | void *impl_state) | ||
2302 | { | ||
2303 | struct GNUNET_CADET_Channel *ch = impl_state; | ||
2304 | |||
2305 | GNUNET_assert (mq == ch->mq); | ||
2306 | ch->mq = NULL; | ||
2307 | } | ||
2308 | |||
2309 | |||
2310 | /** | ||
2311 | * We had an error processing a message we forwarded from a peer to | ||
2312 | * the CADET service. We should just complain about it but otherwise | ||
2313 | * continue processing. | ||
2314 | * | ||
2315 | * @param cls closure | ||
2316 | * @param error error code | ||
2317 | */ | ||
2318 | static void | ||
2319 | cadet_mq_error_handler (void *cls, | ||
2320 | enum GNUNET_MQ_Error error) | ||
2321 | { | ||
2322 | GNUNET_break_op (0); | ||
2323 | } | ||
2324 | |||
2325 | |||
2326 | /** | ||
2327 | * Implementation function that cancels the currently sent message. | ||
2328 | * Should basically undo whatever #mq_send_impl() did. | ||
2329 | * | ||
2330 | * @param mq message queue | ||
2331 | * @param impl_state state specific to the implementation | ||
2332 | */ | ||
2333 | static void | ||
2334 | cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, | ||
2335 | void *impl_state) | ||
2336 | { | ||
2337 | struct GNUNET_CADET_Channel *ch = impl_state; | ||
2338 | |||
2339 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
2340 | "Cannot cancel mq message on channel %X of %p\n", | ||
2341 | ch->ccn.channel_of_client, ch->cadet); | ||
2342 | |||
2343 | GNUNET_break (0); | ||
2344 | } | 2425 | } |
2345 | 2426 | ||
2346 | 2427 | ||