diff options
author | Christian Grothoff <christian@grothoff.org> | 2018-06-28 10:24:09 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2018-06-28 10:24:21 +0200 |
commit | aab6c1174f7868000b21738142a8b16e222d1835 (patch) | |
tree | 2e5120973a31d64c954c3512112443fc5e9fb957 /src | |
parent | 6bba039b3447e128ae67eee9340d8eedd2409ec2 (diff) | |
download | gnunet-aab6c1174f7868000b21738142a8b16e222d1835.tar.gz gnunet-aab6c1174f7868000b21738142a8b16e222d1835.zip |
clean up MQ error handling in cadet_api
Diffstat (limited to 'src')
-rw-r--r-- | src/cadet/cadet_api.c | 173 |
1 files changed, 75 insertions, 98 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index b019424f9..85a8be522 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c | |||
@@ -357,67 +357,52 @@ reconnect (struct GNUNET_CADET_Handle *h); | |||
357 | 357 | ||
358 | 358 | ||
359 | /** | 359 | /** |
360 | * Reconnect callback: tries to reconnect again after a failer previous | 360 | * Function called during #reconnect_cbk() to (re)open |
361 | * reconnecttion | 361 | * all ports that are still open. |
362 | * | ||
363 | * @param cls closure (cadet handle) | ||
364 | */ | ||
365 | static void | ||
366 | reconnect_cbk (void *cls) | ||
367 | { | ||
368 | struct GNUNET_CADET_Handle *h = cls; | ||
369 | |||
370 | h->reconnect_task = NULL; | ||
371 | reconnect (h); | ||
372 | } | ||
373 | |||
374 | |||
375 | /** | ||
376 | * Function called during #reconnect() to destroy | ||
377 | * all channels that are still open. | ||
378 | * | 362 | * |
379 | * @param cls the `struct GNUNET_CADET_Handle` | 363 | * @param cls the `struct GNUNET_CADET_Handle` |
380 | * @param cid chanenl ID | 364 | * @param id port ID |
381 | * @param value a `struct GNUNET_CADET_Channel` to destroy | 365 | * @param value a `struct GNUNET_CADET_Channel` to open |
382 | * @return #GNUNET_OK (continue to iterate) | 366 | * @return #GNUNET_OK (continue to iterate) |
383 | */ | 367 | */ |
384 | static int | 368 | static int |
385 | destroy_channel_on_reconnect_cb (void *cls, | 369 | open_port_cb (void *cls, |
386 | uint32_t cid, | 370 | const struct GNUNET_HashCode *id, |
387 | void *value) | 371 | void *value) |
388 | { | 372 | { |
389 | /* struct GNUNET_CADET_Handle *handle = cls; */ | 373 | struct GNUNET_CADET_Handle *h = cls; |
390 | struct GNUNET_CADET_Channel *ch = value; | 374 | struct GNUNET_CADET_Port *port = value; |
375 | struct GNUNET_CADET_PortMessage *msg; | ||
376 | struct GNUNET_MQ_Envelope *env; | ||
391 | 377 | ||
392 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 378 | (void) id; |
393 | "Destroying channel due to reconnect\n"); | 379 | env = GNUNET_MQ_msg (msg, |
394 | destroy_channel (ch); | 380 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); |
381 | msg->port = port->id; | ||
382 | GNUNET_MQ_send (h->mq, | ||
383 | env); | ||
395 | return GNUNET_OK; | 384 | return GNUNET_OK; |
396 | } | 385 | } |
397 | 386 | ||
398 | 387 | ||
399 | /** | 388 | /** |
400 | * Reconnect to the service, retransmit all infomation to try to restore the | 389 | * Reconnect callback: tries to reconnect again after a failer previous |
401 | * original state. | 390 | * reconnecttion |
402 | * | ||
403 | * @param h handle to the cadet | ||
404 | * | 391 | * |
405 | * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...) | 392 | * @param cls closure (cadet handle) |
406 | */ | 393 | */ |
407 | static void | 394 | static void |
408 | schedule_reconnect (struct GNUNET_CADET_Handle *h) | 395 | reconnect_cbk (void *cls) |
409 | { | 396 | { |
410 | if (NULL != h->reconnect_task) | 397 | struct GNUNET_CADET_Handle *h = cls; |
411 | return; | 398 | |
412 | GNUNET_CONTAINER_multihashmap32_iterate (h->channels, | 399 | h->reconnect_task = NULL; |
413 | &destroy_channel_on_reconnect_cb, | ||
414 | h); | ||
415 | h->reconnect_task | ||
416 | = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, | ||
417 | &reconnect_cbk, | ||
418 | h); | ||
419 | h->reconnect_time | 400 | h->reconnect_time |
420 | = GNUNET_TIME_STD_BACKOFF (h->reconnect_time); | 401 | = GNUNET_TIME_STD_BACKOFF (h->reconnect_time); |
402 | reconnect (h); | ||
403 | GNUNET_CONTAINER_multihashmap_iterate (h->ports, | ||
404 | &open_port_cb, | ||
405 | h); | ||
421 | } | 406 | } |
422 | 407 | ||
423 | 408 | ||
@@ -555,15 +540,16 @@ cadet_mq_error_handler (void *cls, | |||
555 | { | 540 | { |
556 | struct GNUNET_CADET_Channel *ch = cls; | 541 | struct GNUNET_CADET_Channel *ch = cls; |
557 | 542 | ||
558 | GNUNET_break (0); | ||
559 | if (GNUNET_MQ_ERROR_NO_MATCH == error) | 543 | if (GNUNET_MQ_ERROR_NO_MATCH == error) |
560 | { | 544 | { |
561 | /* Got a message we did not understand, still try to continue! */ | 545 | /* Got a message we did not understand, still try to continue! */ |
546 | GNUNET_break_op (0); | ||
562 | GNUNET_CADET_receive_done (ch); | 547 | GNUNET_CADET_receive_done (ch); |
563 | } | 548 | } |
564 | else | 549 | else |
565 | { | 550 | { |
566 | schedule_reconnect (ch->cadet); | 551 | GNUNET_break (0); |
552 | GNUNET_CADET_channel_destroy (ch); | ||
567 | } | 553 | } |
568 | } | 554 | } |
569 | 555 | ||
@@ -581,6 +567,7 @@ cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, | |||
581 | { | 567 | { |
582 | struct GNUNET_CADET_Channel *ch = impl_state; | 568 | struct GNUNET_CADET_Channel *ch = impl_state; |
583 | 569 | ||
570 | (void) mq; | ||
584 | GNUNET_assert (NULL != ch->pending_env); | 571 | GNUNET_assert (NULL != ch->pending_env); |
585 | GNUNET_MQ_discard (ch->pending_env); | 572 | GNUNET_MQ_discard (ch->pending_env); |
586 | ch->pending_env = NULL; | 573 | ch->pending_env = NULL; |
@@ -709,6 +696,7 @@ check_local_data (void *cls, | |||
709 | { | 696 | { |
710 | uint16_t size; | 697 | uint16_t size; |
711 | 698 | ||
699 | (void) cls; | ||
712 | size = ntohs (message->header.size); | 700 | size = ntohs (message->header.size); |
713 | if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size) | 701 | if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size) |
714 | { | 702 | { |
@@ -806,6 +794,32 @@ handle_local_ack (void *cls, | |||
806 | 794 | ||
807 | 795 | ||
808 | /** | 796 | /** |
797 | * Function called during #GNUNET_CADET_disconnect() to destroy | ||
798 | * all channels that are still open. | ||
799 | * | ||
800 | * @param cls the `struct GNUNET_CADET_Handle` | ||
801 | * @param cid chanenl ID | ||
802 | * @param value a `struct GNUNET_CADET_Channel` to destroy | ||
803 | * @return #GNUNET_OK (continue to iterate) | ||
804 | */ | ||
805 | static int | ||
806 | destroy_channel_cb (void *cls, | ||
807 | uint32_t cid, | ||
808 | void *value) | ||
809 | { | ||
810 | /* struct GNUNET_CADET_Handle *handle = cls; */ | ||
811 | struct GNUNET_CADET_Channel *ch = value; | ||
812 | |||
813 | (void) cls; | ||
814 | (void) cid; | ||
815 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
816 | "Destroying channel due to GNUNET_CADET_disconnect()\n"); | ||
817 | destroy_channel (ch); | ||
818 | return GNUNET_OK; | ||
819 | } | ||
820 | |||
821 | |||
822 | /** | ||
809 | * Generic error handler, called with the appropriate error code and | 823 | * Generic error handler, called with the appropriate error code and |
810 | * the same closure specified at the creation of the message queue. | 824 | * the same closure specified at the creation of the message queue. |
811 | * Not every message queue implementation supports an error handler. | 825 | * Not every message queue implementation supports an error handler. |
@@ -822,9 +836,14 @@ handle_mq_error (void *cls, | |||
822 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 836 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
823 | "MQ ERROR: %u\n", | 837 | "MQ ERROR: %u\n", |
824 | error); | 838 | error); |
839 | GNUNET_CONTAINER_multihashmap32_iterate (h->channels, | ||
840 | &destroy_channel_cb, | ||
841 | h); | ||
825 | GNUNET_MQ_destroy (h->mq); | 842 | GNUNET_MQ_destroy (h->mq); |
826 | h->mq = NULL; | 843 | h->mq = NULL; |
827 | reconnect (h); | 844 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, |
845 | &reconnect_cbk, | ||
846 | h); | ||
828 | } | 847 | } |
829 | 848 | ||
830 | 849 | ||
@@ -842,6 +861,7 @@ check_get_peers (void *cls, | |||
842 | { | 861 | { |
843 | size_t esize; | 862 | size_t esize; |
844 | 863 | ||
864 | (void) cls; | ||
845 | esize = ntohs (message->size); | 865 | esize = ntohs (message->size); |
846 | if (sizeof (struct GNUNET_CADET_LocalInfoPeer) == esize) | 866 | if (sizeof (struct GNUNET_CADET_LocalInfoPeer) == esize) |
847 | return GNUNET_OK; | 867 | return GNUNET_OK; |
@@ -895,11 +915,9 @@ check_get_peer (void *cls, | |||
895 | const struct GNUNET_CADET_LocalInfoPeer *message) | 915 | const struct GNUNET_CADET_LocalInfoPeer *message) |
896 | { | 916 | { |
897 | size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer); | 917 | size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer); |
898 | const struct GNUNET_PeerIdentity *paths_array; | ||
899 | size_t esize; | 918 | size_t esize; |
900 | unsigned int epaths; | ||
901 | unsigned int peers; | ||
902 | 919 | ||
920 | (void) cls; | ||
903 | esize = ntohs (message->header.size); | 921 | esize = ntohs (message->header.size); |
904 | if (esize < msize) | 922 | if (esize < msize) |
905 | { | 923 | { |
@@ -911,10 +929,6 @@ check_get_peer (void *cls, | |||
911 | GNUNET_break (0); | 929 | GNUNET_break (0); |
912 | return GNUNET_SYSERR; | 930 | return GNUNET_SYSERR; |
913 | } | 931 | } |
914 | peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity); | ||
915 | epaths = ntohs (message->paths); | ||
916 | paths_array = (const struct GNUNET_PeerIdentity *) &message[1]; | ||
917 | |||
918 | return GNUNET_OK; | 932 | return GNUNET_OK; |
919 | } | 933 | } |
920 | 934 | ||
@@ -1166,38 +1180,6 @@ reconnect (struct GNUNET_CADET_Handle *h) | |||
1166 | handlers, | 1180 | handlers, |
1167 | &handle_mq_error, | 1181 | &handle_mq_error, |
1168 | h); | 1182 | h); |
1169 | if (NULL == h->mq) | ||
1170 | { | ||
1171 | schedule_reconnect (h); | ||
1172 | return; | ||
1173 | } | ||
1174 | h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; | ||
1175 | } | ||
1176 | |||
1177 | |||
1178 | /** | ||
1179 | * Function called during #GNUNET_CADET_disconnect() to destroy | ||
1180 | * all channels that are still open. | ||
1181 | * | ||
1182 | * @param cls the `struct GNUNET_CADET_Handle` | ||
1183 | * @param cid chanenl ID | ||
1184 | * @param value a `struct GNUNET_CADET_Channel` to destroy | ||
1185 | * @return #GNUNET_OK (continue to iterate) | ||
1186 | */ | ||
1187 | static int | ||
1188 | destroy_channel_cb (void *cls, | ||
1189 | uint32_t cid, | ||
1190 | void *value) | ||
1191 | { | ||
1192 | /* struct GNUNET_CADET_Handle *handle = cls; */ | ||
1193 | struct GNUNET_CADET_Channel *ch = value; | ||
1194 | |||
1195 | (void) cls; | ||
1196 | (void) cid; | ||
1197 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1198 | "Destroying channel due to GNUNET_CADET_disconnect()\n"); | ||
1199 | destroy_channel (ch); | ||
1200 | return GNUNET_OK; | ||
1201 | } | 1183 | } |
1202 | 1184 | ||
1203 | 1185 | ||
@@ -1219,6 +1201,7 @@ destroy_port_cb (void *cls, | |||
1219 | struct GNUNET_CADET_Port *port = value; | 1201 | struct GNUNET_CADET_Port *port = value; |
1220 | 1202 | ||
1221 | (void) cls; | 1203 | (void) cls; |
1204 | (void) id; | ||
1222 | /* This is a warning, the app should have cleanly closed all open ports */ | 1205 | /* This is a warning, the app should have cleanly closed all open ports */ |
1223 | GNUNET_break (0); | 1206 | GNUNET_break (0); |
1224 | GNUNET_CADET_close_port (port); | 1207 | GNUNET_CADET_close_port (port); |
@@ -1633,9 +1616,6 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
1633 | return NULL; | 1616 | return NULL; |
1634 | } | 1617 | } |
1635 | h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); | 1618 | h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); |
1636 | h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; | ||
1637 | h->reconnect_task = NULL; | ||
1638 | |||
1639 | return h; | 1619 | return h; |
1640 | } | 1620 | } |
1641 | 1621 | ||
@@ -1661,8 +1641,6 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, | |||
1661 | GNUNET_CADET_DisconnectEventHandler disconnects, | 1641 | GNUNET_CADET_DisconnectEventHandler disconnects, |
1662 | const struct GNUNET_MQ_MessageHandler *handlers) | 1642 | const struct GNUNET_MQ_MessageHandler *handlers) |
1663 | { | 1643 | { |
1664 | struct GNUNET_CADET_PortMessage *msg; | ||
1665 | struct GNUNET_MQ_Envelope *env; | ||
1666 | struct GNUNET_CADET_Port *p; | 1644 | struct GNUNET_CADET_Port *p; |
1667 | 1645 | ||
1668 | GNUNET_assert (NULL != connects); | 1646 | GNUNET_assert (NULL != connects); |
@@ -1688,13 +1666,11 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, | |||
1688 | p->window_changes = window_changes; | 1666 | p->window_changes = window_changes; |
1689 | p->disconnects = disconnects; | 1667 | p->disconnects = disconnects; |
1690 | p->handlers = GNUNET_MQ_copy_handlers (handlers); | 1668 | p->handlers = GNUNET_MQ_copy_handlers (handlers); |
1691 | 1669 | ||
1692 | 1670 | GNUNET_assert (GNUNET_OK == | |
1693 | env = GNUNET_MQ_msg (msg, | 1671 | open_port_cb (h, |
1694 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); | 1672 | &p->id, |
1695 | msg->port = p->id; | 1673 | p)); |
1696 | GNUNET_MQ_send (h->mq, | ||
1697 | env); | ||
1698 | return p; | 1674 | return p; |
1699 | } | 1675 | } |
1700 | 1676 | ||
@@ -1753,7 +1729,8 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h, | |||
1753 | handlers, | 1729 | handlers, |
1754 | &cadet_mq_error_handler, | 1730 | &cadet_mq_error_handler, |
1755 | ch); | 1731 | ch); |
1756 | GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls); | 1732 | GNUNET_MQ_set_handlers_closure (ch->mq, |
1733 | channel_cls); | ||
1757 | 1734 | ||
1758 | /* Request channel creation to service */ | 1735 | /* Request channel creation to service */ |
1759 | env = GNUNET_MQ_msg (msg, | 1736 | env = GNUNET_MQ_msg (msg, |