aboutsummaryrefslogtreecommitdiff
path: root/src/cadet
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2018-06-28 10:24:09 +0200
committerChristian Grothoff <christian@grothoff.org>2018-06-28 10:24:21 +0200
commitaab6c1174f7868000b21738142a8b16e222d1835 (patch)
tree2e5120973a31d64c954c3512112443fc5e9fb957 /src/cadet
parent6bba039b3447e128ae67eee9340d8eedd2409ec2 (diff)
downloadgnunet-aab6c1174f7868000b21738142a8b16e222d1835.tar.gz
gnunet-aab6c1174f7868000b21738142a8b16e222d1835.zip
clean up MQ error handling in cadet_api
Diffstat (limited to 'src/cadet')
-rw-r--r--src/cadet/cadet_api.c173
1 files changed, 75 insertions, 98 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index b019424f9..85a8be522 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -357,67 +357,52 @@ reconnect (struct GNUNET_CADET_Handle *h);
357 357
358 358
359/** 359/**
360 * Reconnect callback: tries to reconnect again after a failer previous 360 * Function called during #reconnect_cbk() to (re)open
361 * reconnecttion 361 * all ports that are still open.
362 *
363 * @param cls closure (cadet handle)
364 */
365static void
366reconnect_cbk (void *cls)
367{
368 struct GNUNET_CADET_Handle *h = cls;
369
370 h->reconnect_task = NULL;
371 reconnect (h);
372}
373
374
375/**
376 * Function called during #reconnect() to destroy
377 * all channels that are still open.
378 * 362 *
379 * @param cls the `struct GNUNET_CADET_Handle` 363 * @param cls the `struct GNUNET_CADET_Handle`
380 * @param cid chanenl ID 364 * @param id port ID
381 * @param value a `struct GNUNET_CADET_Channel` to destroy 365 * @param value a `struct GNUNET_CADET_Channel` to open
382 * @return #GNUNET_OK (continue to iterate) 366 * @return #GNUNET_OK (continue to iterate)
383 */ 367 */
384static int 368static int
385destroy_channel_on_reconnect_cb (void *cls, 369open_port_cb (void *cls,
386 uint32_t cid, 370 const struct GNUNET_HashCode *id,
387 void *value) 371 void *value)
388{ 372{
389 /* struct GNUNET_CADET_Handle *handle = cls; */ 373 struct GNUNET_CADET_Handle *h = cls;
390 struct GNUNET_CADET_Channel *ch = value; 374 struct GNUNET_CADET_Port *port = value;
375 struct GNUNET_CADET_PortMessage *msg;
376 struct GNUNET_MQ_Envelope *env;
391 377
392 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 378 (void) id;
393 "Destroying channel due to reconnect\n"); 379 env = GNUNET_MQ_msg (msg,
394 destroy_channel (ch); 380 GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
381 msg->port = port->id;
382 GNUNET_MQ_send (h->mq,
383 env);
395 return GNUNET_OK; 384 return GNUNET_OK;
396} 385}
397 386
398 387
399/** 388/**
400 * Reconnect to the service, retransmit all infomation to try to restore the 389 * Reconnect callback: tries to reconnect again after a failer previous
401 * original state. 390 * reconnecttion
402 *
403 * @param h handle to the cadet
404 * 391 *
405 * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...) 392 * @param cls closure (cadet handle)
406 */ 393 */
407static void 394static void
408schedule_reconnect (struct GNUNET_CADET_Handle *h) 395reconnect_cbk (void *cls)
409{ 396{
410 if (NULL != h->reconnect_task) 397 struct GNUNET_CADET_Handle *h = cls;
411 return; 398
412 GNUNET_CONTAINER_multihashmap32_iterate (h->channels, 399 h->reconnect_task = NULL;
413 &destroy_channel_on_reconnect_cb,
414 h);
415 h->reconnect_task
416 = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
417 &reconnect_cbk,
418 h);
419 h->reconnect_time 400 h->reconnect_time
420 = GNUNET_TIME_STD_BACKOFF (h->reconnect_time); 401 = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
402 reconnect (h);
403 GNUNET_CONTAINER_multihashmap_iterate (h->ports,
404 &open_port_cb,
405 h);
421} 406}
422 407
423 408
@@ -555,15 +540,16 @@ cadet_mq_error_handler (void *cls,
555{ 540{
556 struct GNUNET_CADET_Channel *ch = cls; 541 struct GNUNET_CADET_Channel *ch = cls;
557 542
558 GNUNET_break (0);
559 if (GNUNET_MQ_ERROR_NO_MATCH == error) 543 if (GNUNET_MQ_ERROR_NO_MATCH == error)
560 { 544 {
561 /* Got a message we did not understand, still try to continue! */ 545 /* Got a message we did not understand, still try to continue! */
546 GNUNET_break_op (0);
562 GNUNET_CADET_receive_done (ch); 547 GNUNET_CADET_receive_done (ch);
563 } 548 }
564 else 549 else
565 { 550 {
566 schedule_reconnect (ch->cadet); 551 GNUNET_break (0);
552 GNUNET_CADET_channel_destroy (ch);
567 } 553 }
568} 554}
569 555
@@ -581,6 +567,7 @@ cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
581{ 567{
582 struct GNUNET_CADET_Channel *ch = impl_state; 568 struct GNUNET_CADET_Channel *ch = impl_state;
583 569
570 (void) mq;
584 GNUNET_assert (NULL != ch->pending_env); 571 GNUNET_assert (NULL != ch->pending_env);
585 GNUNET_MQ_discard (ch->pending_env); 572 GNUNET_MQ_discard (ch->pending_env);
586 ch->pending_env = NULL; 573 ch->pending_env = NULL;
@@ -709,6 +696,7 @@ check_local_data (void *cls,
709{ 696{
710 uint16_t size; 697 uint16_t size;
711 698
699 (void) cls;
712 size = ntohs (message->header.size); 700 size = ntohs (message->header.size);
713 if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size) 701 if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
714 { 702 {
@@ -806,6 +794,32 @@ handle_local_ack (void *cls,
806 794
807 795
808/** 796/**
797 * Function called during #GNUNET_CADET_disconnect() to destroy
798 * all channels that are still open.
799 *
800 * @param cls the `struct GNUNET_CADET_Handle`
801 * @param cid chanenl ID
802 * @param value a `struct GNUNET_CADET_Channel` to destroy
803 * @return #GNUNET_OK (continue to iterate)
804 */
805static int
806destroy_channel_cb (void *cls,
807 uint32_t cid,
808 void *value)
809{
810 /* struct GNUNET_CADET_Handle *handle = cls; */
811 struct GNUNET_CADET_Channel *ch = value;
812
813 (void) cls;
814 (void) cid;
815 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
816 "Destroying channel due to GNUNET_CADET_disconnect()\n");
817 destroy_channel (ch);
818 return GNUNET_OK;
819}
820
821
822/**
809 * Generic error handler, called with the appropriate error code and 823 * Generic error handler, called with the appropriate error code and
810 * the same closure specified at the creation of the message queue. 824 * the same closure specified at the creation of the message queue.
811 * Not every message queue implementation supports an error handler. 825 * Not every message queue implementation supports an error handler.
@@ -822,9 +836,14 @@ handle_mq_error (void *cls,
822 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 836 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
823 "MQ ERROR: %u\n", 837 "MQ ERROR: %u\n",
824 error); 838 error);
839 GNUNET_CONTAINER_multihashmap32_iterate (h->channels,
840 &destroy_channel_cb,
841 h);
825 GNUNET_MQ_destroy (h->mq); 842 GNUNET_MQ_destroy (h->mq);
826 h->mq = NULL; 843 h->mq = NULL;
827 reconnect (h); 844 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
845 &reconnect_cbk,
846 h);
828} 847}
829 848
830 849
@@ -842,6 +861,7 @@ check_get_peers (void *cls,
842{ 861{
843 size_t esize; 862 size_t esize;
844 863
864 (void) cls;
845 esize = ntohs (message->size); 865 esize = ntohs (message->size);
846 if (sizeof (struct GNUNET_CADET_LocalInfoPeer) == esize) 866 if (sizeof (struct GNUNET_CADET_LocalInfoPeer) == esize)
847 return GNUNET_OK; 867 return GNUNET_OK;
@@ -895,11 +915,9 @@ check_get_peer (void *cls,
895 const struct GNUNET_CADET_LocalInfoPeer *message) 915 const struct GNUNET_CADET_LocalInfoPeer *message)
896{ 916{
897 size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer); 917 size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
898 const struct GNUNET_PeerIdentity *paths_array;
899 size_t esize; 918 size_t esize;
900 unsigned int epaths;
901 unsigned int peers;
902 919
920 (void) cls;
903 esize = ntohs (message->header.size); 921 esize = ntohs (message->header.size);
904 if (esize < msize) 922 if (esize < msize)
905 { 923 {
@@ -911,10 +929,6 @@ check_get_peer (void *cls,
911 GNUNET_break (0); 929 GNUNET_break (0);
912 return GNUNET_SYSERR; 930 return GNUNET_SYSERR;
913 } 931 }
914 peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
915 epaths = ntohs (message->paths);
916 paths_array = (const struct GNUNET_PeerIdentity *) &message[1];
917
918 return GNUNET_OK; 932 return GNUNET_OK;
919} 933}
920 934
@@ -1166,38 +1180,6 @@ reconnect (struct GNUNET_CADET_Handle *h)
1166 handlers, 1180 handlers,
1167 &handle_mq_error, 1181 &handle_mq_error,
1168 h); 1182 h);
1169 if (NULL == h->mq)
1170 {
1171 schedule_reconnect (h);
1172 return;
1173 }
1174 h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1175}
1176
1177
1178/**
1179 * Function called during #GNUNET_CADET_disconnect() to destroy
1180 * all channels that are still open.
1181 *
1182 * @param cls the `struct GNUNET_CADET_Handle`
1183 * @param cid chanenl ID
1184 * @param value a `struct GNUNET_CADET_Channel` to destroy
1185 * @return #GNUNET_OK (continue to iterate)
1186 */
1187static int
1188destroy_channel_cb (void *cls,
1189 uint32_t cid,
1190 void *value)
1191{
1192 /* struct GNUNET_CADET_Handle *handle = cls; */
1193 struct GNUNET_CADET_Channel *ch = value;
1194
1195 (void) cls;
1196 (void) cid;
1197 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1198 "Destroying channel due to GNUNET_CADET_disconnect()\n");
1199 destroy_channel (ch);
1200 return GNUNET_OK;
1201} 1183}
1202 1184
1203 1185
@@ -1219,6 +1201,7 @@ destroy_port_cb (void *cls,
1219 struct GNUNET_CADET_Port *port = value; 1201 struct GNUNET_CADET_Port *port = value;
1220 1202
1221 (void) cls; 1203 (void) cls;
1204 (void) id;
1222 /* This is a warning, the app should have cleanly closed all open ports */ 1205 /* This is a warning, the app should have cleanly closed all open ports */
1223 GNUNET_break (0); 1206 GNUNET_break (0);
1224 GNUNET_CADET_close_port (port); 1207 GNUNET_CADET_close_port (port);
@@ -1633,9 +1616,6 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
1633 return NULL; 1616 return NULL;
1634 } 1617 }
1635 h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); 1618 h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1636 h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
1637 h->reconnect_task = NULL;
1638
1639 return h; 1619 return h;
1640} 1620}
1641 1621
@@ -1661,8 +1641,6 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1661 GNUNET_CADET_DisconnectEventHandler disconnects, 1641 GNUNET_CADET_DisconnectEventHandler disconnects,
1662 const struct GNUNET_MQ_MessageHandler *handlers) 1642 const struct GNUNET_MQ_MessageHandler *handlers)
1663{ 1643{
1664 struct GNUNET_CADET_PortMessage *msg;
1665 struct GNUNET_MQ_Envelope *env;
1666 struct GNUNET_CADET_Port *p; 1644 struct GNUNET_CADET_Port *p;
1667 1645
1668 GNUNET_assert (NULL != connects); 1646 GNUNET_assert (NULL != connects);
@@ -1688,13 +1666,11 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1688 p->window_changes = window_changes; 1666 p->window_changes = window_changes;
1689 p->disconnects = disconnects; 1667 p->disconnects = disconnects;
1690 p->handlers = GNUNET_MQ_copy_handlers (handlers); 1668 p->handlers = GNUNET_MQ_copy_handlers (handlers);
1691 1669
1692 1670 GNUNET_assert (GNUNET_OK ==
1693 env = GNUNET_MQ_msg (msg, 1671 open_port_cb (h,
1694 GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); 1672 &p->id,
1695 msg->port = p->id; 1673 p));
1696 GNUNET_MQ_send (h->mq,
1697 env);
1698 return p; 1674 return p;
1699} 1675}
1700 1676
@@ -1753,7 +1729,8 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1753 handlers, 1729 handlers,
1754 &cadet_mq_error_handler, 1730 &cadet_mq_error_handler,
1755 ch); 1731 ch);
1756 GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls); 1732 GNUNET_MQ_set_handlers_closure (ch->mq,
1733 channel_cls);
1757 1734
1758 /* Request channel creation to service */ 1735 /* Request channel creation to service */
1759 env = GNUNET_MQ_msg (msg, 1736 env = GNUNET_MQ_msg (msg,