aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/cadet_api.c
diff options
context:
space:
mode:
authorBart Polot <bart.polot+voyager@gmail.com>2017-01-31 04:17:58 +0100
committerBart Polot <bart.polot+voyager@gmail.com>2017-01-31 04:17:58 +0100
commit18197ec851b3a4f23b96e8ea9a6ba58ae3982bac (patch)
tree5f503093327381cb0b4ba12668d612c7bf47782a /src/cadet/cadet_api.c
parentae5354d85a43830da6921a53071edfe3859e21a9 (diff)
downloadgnunet-18197ec851b3a4f23b96e8ea9a6ba58ae3982bac.tar.gz
gnunet-18197ec851b3a4f23b96e8ea9a6ba58ae3982bac.zip
Implementation of port opening and handling in MQ
Diffstat (limited to 'src/cadet/cadet_api.c')
-rw-r--r--src/cadet/cadet_api.c301
1 files changed, 191 insertions, 110 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 3491bd75f..3eaa78af8 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -280,14 +280,11 @@ struct GNUNET_CADET_Channel
280 /** 280 /**
281 * Are we allowed to send to the service? 281 * Are we allowed to send to the service?
282 * 282 *
283 * @deprecated 283 * @deprecated?
284 */ 284 */
285 unsigned int allow_send; 285 unsigned int allow_send;
286 286
287 /****************************************************************************/
288 /***************************** MQ ************************************/ 287 /***************************** MQ ************************************/
289 /****************************************************************************/
290
291 /** 288 /**
292 * Message Queue for the channel. 289 * Message Queue for the channel.
293 */ 290 */
@@ -330,6 +327,38 @@ struct GNUNET_CADET_Port
330 * Closure for @a handler. 327 * Closure for @a handler.
331 */ 328 */
332 void *cls; 329 void *cls;
330
331 /***************************** MQ ************************************/
332
333 /**
334 * Port "number"
335 */
336 struct GNUNET_HashCode id;
337
338 /**
339 * Handler for incoming channels on this port
340 */
341 GNUNET_CADET_ConnectEventHandler connects;
342
343 /**
344 * Closure for @ref connects
345 */
346 void * connects_cls;
347
348 /**
349 * Window size change handler.
350 */
351 GNUNET_CADET_WindowSizeEventHandler window_changes;
352
353 /**
354 * Handler called when an incoming channel is destroyed..
355 */
356 GNUNET_CADET_DisconnectEventHandler disconnects;
357
358 /**
359 * Payload handlers for incoming channels.
360 */
361 const struct GNUNET_MQ_MessageHandler *handlers;
333}; 362};
334 363
335 364
@@ -553,6 +582,114 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
553} 582}
554 583
555 584
585/******************************************************************************/
586/*********************** MQ API CALLBACKS ****************************/
587/******************************************************************************/
588
589
590/**
591 * Implement sending functionality of a message queue for
592 * us sending messages to a peer.
593 *
594 * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
595 * in order to label the message with the channel ID and send the
596 * encapsulated message to the service.
597 *
598 * @param mq the message queue
599 * @param msg the message to send
600 * @param impl_state state of the implementation
601 */
602static void
603cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
604 const struct GNUNET_MessageHeader *msg,
605 void *impl_state)
606{
607 struct GNUNET_CADET_Channel *ch = impl_state;
608 struct GNUNET_CADET_Handle *h = ch->cadet;
609 uint16_t msize;
610 struct GNUNET_MQ_Envelope *env;
611 struct GNUNET_CADET_LocalData *cadet_msg;
612
613
614 if (NULL == h->mq)
615 {
616 /* We're currently reconnecting, pretend this worked */
617 GNUNET_MQ_impl_send_continue (mq);
618 return;
619 }
620
621 /* check message size for sanity */
622 msize = ntohs (msg->size);
623 if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
624 {
625 GNUNET_break (0);
626 GNUNET_MQ_impl_send_continue (mq);
627 return;
628 }
629
630 env = GNUNET_MQ_msg_nested_mh (cadet_msg,
631 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
632 msg);
633 cadet_msg->ccn = ch->ccn;
634 GNUNET_MQ_send (h->mq, env);
635 GNUNET_MQ_impl_send_continue (mq);
636}
637
638
639/**
640 * Handle destruction of a message queue. Implementations must not
641 * free @a mq, but should take care of @a impl_state.
642 *
643 * @param mq the message queue to destroy
644 * @param impl_state state of the implementation
645 */
646static void
647cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
648 void *impl_state)
649{
650 struct GNUNET_CADET_Channel *ch = impl_state;
651
652 GNUNET_assert (mq == ch->mq);
653 ch->mq = NULL;
654}
655
656
657/**
658 * We had an error processing a message we forwarded from a peer to
659 * the CADET service. We should just complain about it but otherwise
660 * continue processing.
661 *
662 * @param cls closure
663 * @param error error code
664 */
665static void
666cadet_mq_error_handler (void *cls,
667 enum GNUNET_MQ_Error error)
668{
669 GNUNET_break_op (0);
670}
671
672
673/**
674 * Implementation function that cancels the currently sent message.
675 * Should basically undo whatever #mq_send_impl() did.
676 *
677 * @param mq message queue
678 * @param impl_state state specific to the implementation
679 */
680static void
681cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
682 void *impl_state)
683{
684 struct GNUNET_CADET_Channel *ch = impl_state;
685
686 LOG (GNUNET_ERROR_TYPE_WARNING,
687 "Cannot cancel mq message on channel %X of %p\n",
688 ch->ccn.channel_of_client, ch->cadet);
689
690 GNUNET_break (0);
691}
692
556 693
557/******************************************************************************/ 694/******************************************************************************/
558/*********************** RECEIVE HANDLERS ****************************/ 695/*********************** RECEIVE HANDLERS ****************************/
@@ -627,15 +764,17 @@ handle_channel_created (void *cls,
627 port = find_port (h, port_number); 764 port = find_port (h, port_number);
628 if (NULL == port) 765 if (NULL == port)
629 { 766 {
767 /* We could have closed the port but the service didn't know about it yet
768 * This is not an error.
769 */
630 struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg; 770 struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
631 struct GNUNET_MQ_Envelope *env; 771 struct GNUNET_MQ_Envelope *env;
632 772
633 GNUNET_break (0); 773 GNUNET_break (0);
634 LOG (GNUNET_ERROR_TYPE_DEBUG, 774 LOG (GNUNET_ERROR_TYPE_DEBUG,
635 "No handler for incoming channel %X [%s]\n", 775 "No handler for incoming channel %X (on port %s, recently closed?)\n",
636 ntohl (ccn.channel_of_client), 776 ntohl (ccn.channel_of_client),
637 GNUNET_h2s (port_number)); 777 GNUNET_h2s (port_number));
638 /* FIXME: should disconnect instead, this is a serious error! */
639 env = GNUNET_MQ_msg (d_msg, 778 env = GNUNET_MQ_msg (d_msg,
640 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); 779 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
641 d_msg->ccn = msg->ccn; 780 d_msg->ccn = msg->ccn;
@@ -651,17 +790,38 @@ handle_channel_created (void *cls,
651 ch->ccn = ccn; 790 ch->ccn = ccn;
652 ch->incoming_port = port; 791 ch->incoming_port = port;
653 ch->options = ntohl (msg->opt); 792 ch->options = ntohl (msg->opt);
654
655 LOG (GNUNET_ERROR_TYPE_DEBUG, 793 LOG (GNUNET_ERROR_TYPE_DEBUG,
656 "Creating incoming channel %X [%s] %p\n", 794 "Creating incoming channel %X [%s] %p\n",
657 ntohl (ccn.channel_of_client), 795 ntohl (ccn.channel_of_client),
658 GNUNET_h2s (port_number), 796 GNUNET_h2s (port_number),
659 ch); 797 ch);
660 ch->ctx = port->handler (port->cls, 798
661 ch, 799 if (NULL != port->handler)
662 &msg->peer, 800 {
663 port->hash, 801 /** @deprecated */
664 ch->options); 802 /* Old style API */
803 ch->ctx = port->handler (port->cls,
804 ch,
805 &msg->peer,
806 port->hash,
807 ch->options);
808 } else {
809 /* MQ API */
810 GNUNET_assert (NULL != port->connects);
811 ch->window_changes = port->window_changes;
812 ch->disconnects = port->disconnects;
813 ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
814 &cadet_mq_destroy_impl,
815 &cadet_mq_cancel_impl,
816 ch,
817 port->handlers,
818 &cadet_mq_error_handler,
819 ch);
820 ch->ctx = port->connects (port->cadet->cls,
821 ch,
822 &msg->peer);
823 GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
824 }
665} 825}
666 826
667 827
@@ -2236,111 +2396,32 @@ GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
2236 GNUNET_CADET_DisconnectEventHandler disconnects, 2396 GNUNET_CADET_DisconnectEventHandler disconnects,
2237 const struct GNUNET_MQ_MessageHandler *handlers) 2397 const struct GNUNET_MQ_MessageHandler *handlers)
2238{ 2398{
2239 return NULL; 2399 struct GNUNET_CADET_PortMessage *msg;
2240}
2241
2242
2243/**
2244 * Implement sending functionality of a message queue for
2245 * us sending messages to a peer.
2246 *
2247 * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
2248 * in order to label the message with the channel ID and send the
2249 * encapsulated message to the service.
2250 *
2251 * @param mq the message queue
2252 * @param msg the message to send
2253 * @param impl_state state of the implementation
2254 */
2255static void
2256cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
2257 const struct GNUNET_MessageHeader *msg,
2258 void *impl_state)
2259{
2260 struct GNUNET_CADET_Channel *ch = impl_state;
2261 struct GNUNET_CADET_Handle *h = ch->cadet;
2262 uint16_t msize;
2263 struct GNUNET_MQ_Envelope *env; 2400 struct GNUNET_MQ_Envelope *env;
2264 struct GNUNET_CADET_LocalData *cadet_msg; 2401 struct GNUNET_CADET_Port *p;
2265 2402
2403 GNUNET_assert (NULL != connects);
2266 2404
2267 if (NULL == h->mq) 2405 p = GNUNET_new (struct GNUNET_CADET_Port);
2268 { 2406 p->cadet = h;
2269 /* We're currently reconnecting, pretend this worked */ 2407 p->id = *port;
2270 GNUNET_MQ_impl_send_continue (mq); 2408 p->connects = connects;
2271 return; 2409 p->cls = connects_cls;
2272 } 2410 p->window_changes = window_changes;
2411 p->disconnects = disconnects;
2412 p->handlers = handlers;
2273 2413
2274 /* check message size for sanity */ 2414 GNUNET_assert (GNUNET_OK ==
2275 msize = ntohs (msg->size); 2415 GNUNET_CONTAINER_multihashmap_put (h->ports,
2276 if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE) 2416 p->hash,
2277 { 2417 p,
2278 GNUNET_break (0); 2418 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2279 GNUNET_MQ_impl_send_continue (mq);
2280 return;
2281 }
2282 2419
2283 env = GNUNET_MQ_msg_nested_mh (cadet_msg, 2420 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
2284 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, 2421 msg->port = p->id;
2285 msg);
2286 cadet_msg->ccn = ch->ccn;
2287 GNUNET_MQ_send (h->mq, env); 2422 GNUNET_MQ_send (h->mq, env);
2288 GNUNET_MQ_impl_send_continue (mq);
2289}
2290
2291 2423
2292/** 2424 return p;
2293 * Handle destruction of a message queue. Implementations must not
2294 * free @a mq, but should take care of @a impl_state.
2295 *
2296 * @param mq the message queue to destroy
2297 * @param impl_state state of the implementation
2298 */
2299static void
2300cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
2301 void *impl_state)
2302{
2303 struct GNUNET_CADET_Channel *ch = impl_state;
2304
2305 GNUNET_assert (mq == ch->mq);
2306 ch->mq = NULL;
2307}
2308
2309
2310/**
2311 * We had an error processing a message we forwarded from a peer to
2312 * the CADET service. We should just complain about it but otherwise
2313 * continue processing.
2314 *
2315 * @param cls closure
2316 * @param error error code
2317 */
2318static void
2319cadet_mq_error_handler (void *cls,
2320 enum GNUNET_MQ_Error error)
2321{
2322 GNUNET_break_op (0);
2323}
2324
2325
2326/**
2327 * Implementation function that cancels the currently sent message.
2328 * Should basically undo whatever #mq_send_impl() did.
2329 *
2330 * @param mq message queue
2331 * @param impl_state state specific to the implementation
2332 */
2333static void
2334cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
2335 void *impl_state)
2336{
2337 struct GNUNET_CADET_Channel *ch = impl_state;
2338
2339 LOG (GNUNET_ERROR_TYPE_WARNING,
2340 "Cannot cancel mq message on channel %X of %p\n",
2341 ch->ccn.channel_of_client, ch->cadet);
2342
2343 GNUNET_break (0);
2344} 2425}
2345 2426
2346 2427