diff options
author | Gabor X Toth <*@tg-x.net> | 2016-08-04 20:10:22 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2016-08-04 20:10:22 +0000 |
commit | 319422c2d2ae7f88f931fae0bd0e7b1efe2ad68d (patch) | |
tree | 1b6b2322fe2602f9fda825945053abe405a0ae26 /src/multicast | |
parent | cb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f (diff) | |
download | gnunet-319422c2d2ae7f88f931fae0bd0e7b1efe2ad68d.tar.gz gnunet-319422c2d2ae7f88f931fae0bd0e7b1efe2ad68d.zip |
multicast: switch to MQ
Diffstat (limited to 'src/multicast')
-rw-r--r-- | src/multicast/multicast_api.c | 59 |
1 files changed, 25 insertions, 34 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index db0f0e759..6fb45d722 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -27,7 +27,6 @@ | |||
27 | 27 | ||
28 | #include "platform.h" | 28 | #include "platform.h" |
29 | #include "gnunet_util_lib.h" | 29 | #include "gnunet_util_lib.h" |
30 | #include "gnunet_mq_lib.h" | ||
31 | #include "gnunet_multicast_service.h" | 30 | #include "gnunet_multicast_service.h" |
32 | #include "multicast.h" | 31 | #include "multicast.h" |
33 | 32 | ||
@@ -79,7 +78,7 @@ struct GNUNET_MULTICAST_Group | |||
79 | /** | 78 | /** |
80 | * Time to wait until we try to reconnect on failure. | 79 | * Time to wait until we try to reconnect on failure. |
81 | */ | 80 | */ |
82 | struct GNUNET_TIME_Relative reconnect_backoff; | 81 | struct GNUNET_TIME_Relative reconnect_delay; |
83 | 82 | ||
84 | /** | 83 | /** |
85 | * Task for reconnecting when the listener fails. | 84 | * Task for reconnecting when the listener fails. |
@@ -255,7 +254,7 @@ handle_group_join_request (void *cls, | |||
255 | jh->peer = jreq->peer; | 254 | jh->peer = jreq->peer; |
256 | grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); | 255 | grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); |
257 | 256 | ||
258 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 257 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
259 | } | 258 | } |
260 | 259 | ||
261 | 260 | ||
@@ -289,7 +288,7 @@ handle_group_message (void *cls, | |||
289 | if (NULL != grp->message_cb) | 288 | if (NULL != grp->message_cb) |
290 | grp->message_cb (grp->cb_cls, mmsg); | 289 | grp->message_cb (grp->cb_cls, mmsg); |
291 | 290 | ||
292 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 291 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
293 | } | 292 | } |
294 | 293 | ||
295 | 294 | ||
@@ -322,7 +321,7 @@ handle_group_fragment_ack (void *cls, | |||
322 | else | 321 | else |
323 | member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); | 322 | member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); |
324 | 323 | ||
325 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 324 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
326 | } | 325 | } |
327 | 326 | ||
328 | 327 | ||
@@ -355,7 +354,7 @@ handle_origin_request (void *cls, | |||
355 | if (NULL != orig->request_cb) | 354 | if (NULL != orig->request_cb) |
356 | orig->request_cb (grp->cb_cls, req); | 355 | orig->request_cb (grp->cb_cls, req); |
357 | 356 | ||
358 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 357 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
359 | } | 358 | } |
360 | 359 | ||
361 | 360 | ||
@@ -400,7 +399,7 @@ handle_group_replay_request (void *cls, | |||
400 | } | 399 | } |
401 | } | 400 | } |
402 | 401 | ||
403 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 402 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
404 | } | 403 | } |
405 | 404 | ||
406 | 405 | ||
@@ -516,7 +515,7 @@ handle_member_join_decision (void *cls, | |||
516 | //if (GNUNET_YES != is_admitted) | 515 | //if (GNUNET_YES != is_admitted) |
517 | // GNUNET_MULTICAST_member_part (mem); | 516 | // GNUNET_MULTICAST_member_part (mem); |
518 | 517 | ||
519 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 518 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
520 | } | 519 | } |
521 | 520 | ||
522 | 521 | ||
@@ -693,7 +692,7 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, | |||
693 | } | 692 | } |
694 | 693 | ||
695 | 694 | ||
696 | void | 695 | static void |
697 | origin_connect (struct GNUNET_MULTICAST_Origin *orig); | 696 | origin_connect (struct GNUNET_MULTICAST_Origin *orig); |
698 | 697 | ||
699 | 698 | ||
@@ -707,9 +706,9 @@ origin_reconnect (void *cls) | |||
707 | /** | 706 | /** |
708 | * Origin client disconnected from service. | 707 | * Origin client disconnected from service. |
709 | * | 708 | * |
710 | * Reconnect after backoff period.= | 709 | * Reconnect after backoff period. |
711 | */ | 710 | */ |
712 | void | 711 | static void |
713 | origin_disconnected (void *cls, enum GNUNET_MQ_Error error) | 712 | origin_disconnected (void *cls, enum GNUNET_MQ_Error error) |
714 | { | 713 | { |
715 | struct GNUNET_MULTICAST_Origin *orig = cls; | 714 | struct GNUNET_MULTICAST_Origin *orig = cls; |
@@ -724,17 +723,17 @@ origin_disconnected (void *cls, enum GNUNET_MQ_Error error) | |||
724 | grp->mq = NULL; | 723 | grp->mq = NULL; |
725 | } | 724 | } |
726 | 725 | ||
727 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff, | 726 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, |
728 | &origin_reconnect, | 727 | &origin_reconnect, |
729 | orig); | 728 | orig); |
730 | grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff); | 729 | grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); |
731 | } | 730 | } |
732 | 731 | ||
733 | 732 | ||
734 | /** | 733 | /** |
735 | * Connect to service as origin. | 734 | * Connect to service as origin. |
736 | */ | 735 | */ |
737 | void | 736 | static void |
738 | origin_connect (struct GNUNET_MULTICAST_Origin *orig) | 737 | origin_connect (struct GNUNET_MULTICAST_Origin *orig) |
739 | { | 738 | { |
740 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; | 739 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; |
@@ -770,11 +769,7 @@ origin_connect (struct GNUNET_MULTICAST_Origin *orig) | |||
770 | 769 | ||
771 | grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", | 770 | grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", |
772 | handlers, origin_disconnected, orig); | 771 | handlers, origin_disconnected, orig); |
773 | if (NULL == grp->mq) | 772 | GNUNET_assert (NULL != grp->mq); |
774 | { | ||
775 | GNUNET_break (0); | ||
776 | return; | ||
777 | } | ||
778 | GNUNET_MQ_send_copy (grp->mq, grp->connect_env); | 773 | GNUNET_MQ_send_copy (grp->mq, grp->connect_env); |
779 | } | 774 | } |
780 | 775 | ||
@@ -902,7 +897,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
902 | "%p OriginTransmitNotify() returned error or invalid message size.\n", | 897 | "%p OriginTransmitNotify() returned error or invalid message size.\n", |
903 | orig); | 898 | orig); |
904 | /* FIXME: handle error */ | 899 | /* FIXME: handle error */ |
905 | GNUNET_free (env); | 900 | GNUNET_MQ_discard (env); |
906 | return; | 901 | return; |
907 | } | 902 | } |
908 | 903 | ||
@@ -910,7 +905,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
910 | { | 905 | { |
911 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 906 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
912 | "%p OriginTransmitNotify() - transmission paused.\n", orig); | 907 | "%p OriginTransmitNotify() - transmission paused.\n", orig); |
913 | GNUNET_free (env); | 908 | GNUNET_MQ_discard (env); |
914 | return; /* Transmission paused. */ | 909 | return; /* Transmission paused. */ |
915 | } | 910 | } |
916 | 911 | ||
@@ -1000,7 +995,7 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan | |||
1000 | } | 995 | } |
1001 | 996 | ||
1002 | 997 | ||
1003 | void | 998 | static void |
1004 | member_connect (struct GNUNET_MULTICAST_Member *mem); | 999 | member_connect (struct GNUNET_MULTICAST_Member *mem); |
1005 | 1000 | ||
1006 | 1001 | ||
@@ -1016,7 +1011,7 @@ member_reconnect (void *cls) | |||
1016 | * | 1011 | * |
1017 | * Reconnect after backoff period. | 1012 | * Reconnect after backoff period. |
1018 | */ | 1013 | */ |
1019 | void | 1014 | static void |
1020 | member_disconnected (void *cls, enum GNUNET_MQ_Error error) | 1015 | member_disconnected (void *cls, enum GNUNET_MQ_Error error) |
1021 | { | 1016 | { |
1022 | struct GNUNET_MULTICAST_Member *mem = cls; | 1017 | struct GNUNET_MULTICAST_Member *mem = cls; |
@@ -1028,17 +1023,17 @@ member_disconnected (void *cls, enum GNUNET_MQ_Error error) | |||
1028 | GNUNET_MQ_destroy (grp->mq); | 1023 | GNUNET_MQ_destroy (grp->mq); |
1029 | grp->mq = NULL; | 1024 | grp->mq = NULL; |
1030 | 1025 | ||
1031 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff, | 1026 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, |
1032 | &member_reconnect, | 1027 | &member_reconnect, |
1033 | mem); | 1028 | mem); |
1034 | grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff); | 1029 | grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); |
1035 | } | 1030 | } |
1036 | 1031 | ||
1037 | 1032 | ||
1038 | /** | 1033 | /** |
1039 | * Connect to service as member. | 1034 | * Connect to service as member. |
1040 | */ | 1035 | */ |
1041 | void | 1036 | static void |
1042 | member_connect (struct GNUNET_MULTICAST_Member *mem) | 1037 | member_connect (struct GNUNET_MULTICAST_Member *mem) |
1043 | { | 1038 | { |
1044 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; | 1039 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; |
@@ -1079,11 +1074,7 @@ member_connect (struct GNUNET_MULTICAST_Member *mem) | |||
1079 | 1074 | ||
1080 | grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", | 1075 | grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", |
1081 | handlers, member_disconnected, mem); | 1076 | handlers, member_disconnected, mem); |
1082 | if (NULL == grp->mq) | 1077 | GNUNET_assert (NULL != grp->mq); |
1083 | { | ||
1084 | GNUNET_break (0); | ||
1085 | return; | ||
1086 | } | ||
1087 | GNUNET_MQ_send_copy (grp->mq, grp->connect_env); | 1078 | GNUNET_MQ_send_copy (grp->mq, grp->connect_env); |
1088 | } | 1079 | } |
1089 | 1080 | ||
@@ -1171,7 +1162,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1171 | if (0 < join_msg_size) | 1162 | if (0 < join_msg_size) |
1172 | GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); | 1163 | GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); |
1173 | 1164 | ||
1174 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 1165 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
1175 | grp->is_origin = GNUNET_NO; | 1166 | grp->is_origin = GNUNET_NO; |
1176 | grp->cfg = cfg; | 1167 | grp->cfg = cfg; |
1177 | 1168 | ||
@@ -1326,14 +1317,14 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) | |||
1326 | "MemberTransmitNotify() returned error or invalid message size. " | 1317 | "MemberTransmitNotify() returned error or invalid message size. " |
1327 | "ret=%d, buf_size=%u\n", ret, buf_size); | 1318 | "ret=%d, buf_size=%u\n", ret, buf_size); |
1328 | /* FIXME: handle error */ | 1319 | /* FIXME: handle error */ |
1329 | GNUNET_free (req); | 1320 | GNUNET_MQ_discard (env); |
1330 | return; | 1321 | return; |
1331 | } | 1322 | } |
1332 | 1323 | ||
1333 | if (GNUNET_NO == ret && 0 == buf_size) | 1324 | if (GNUNET_NO == ret && 0 == buf_size) |
1334 | { | 1325 | { |
1335 | /* Transmission paused. */ | 1326 | /* Transmission paused. */ |
1336 | GNUNET_free (req); | 1327 | GNUNET_MQ_discard (env); |
1337 | return; | 1328 | return; |
1338 | } | 1329 | } |
1339 | 1330 | ||