aboutsummaryrefslogtreecommitdiff
path: root/src/multicast
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-08-04 20:10:22 +0000
committerGabor X Toth <*@tg-x.net>2016-08-04 20:10:22 +0000
commit319422c2d2ae7f88f931fae0bd0e7b1efe2ad68d (patch)
tree1b6b2322fe2602f9fda825945053abe405a0ae26 /src/multicast
parentcb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f (diff)
downloadgnunet-319422c2d2ae7f88f931fae0bd0e7b1efe2ad68d.tar.gz
gnunet-319422c2d2ae7f88f931fae0bd0e7b1efe2ad68d.zip
multicast: switch to MQ
Diffstat (limited to 'src/multicast')
-rw-r--r--src/multicast/multicast_api.c59
1 files changed, 25 insertions, 34 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index db0f0e759..6fb45d722 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -27,7 +27,6 @@
27 27
28#include "platform.h" 28#include "platform.h"
29#include "gnunet_util_lib.h" 29#include "gnunet_util_lib.h"
30#include "gnunet_mq_lib.h"
31#include "gnunet_multicast_service.h" 30#include "gnunet_multicast_service.h"
32#include "multicast.h" 31#include "multicast.h"
33 32
@@ -79,7 +78,7 @@ struct GNUNET_MULTICAST_Group
79 /** 78 /**
80 * Time to wait until we try to reconnect on failure. 79 * Time to wait until we try to reconnect on failure.
81 */ 80 */
82 struct GNUNET_TIME_Relative reconnect_backoff; 81 struct GNUNET_TIME_Relative reconnect_delay;
83 82
84 /** 83 /**
85 * Task for reconnecting when the listener fails. 84 * Task for reconnecting when the listener fails.
@@ -255,7 +254,7 @@ handle_group_join_request (void *cls,
255 jh->peer = jreq->peer; 254 jh->peer = jreq->peer;
256 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); 255 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
257 256
258 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 257 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
259} 258}
260 259
261 260
@@ -289,7 +288,7 @@ handle_group_message (void *cls,
289 if (NULL != grp->message_cb) 288 if (NULL != grp->message_cb)
290 grp->message_cb (grp->cb_cls, mmsg); 289 grp->message_cb (grp->cb_cls, mmsg);
291 290
292 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 291 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
293} 292}
294 293
295 294
@@ -322,7 +321,7 @@ handle_group_fragment_ack (void *cls,
322 else 321 else
323 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); 322 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
324 323
325 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 324 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
326} 325}
327 326
328 327
@@ -355,7 +354,7 @@ handle_origin_request (void *cls,
355 if (NULL != orig->request_cb) 354 if (NULL != orig->request_cb)
356 orig->request_cb (grp->cb_cls, req); 355 orig->request_cb (grp->cb_cls, req);
357 356
358 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 357 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
359} 358}
360 359
361 360
@@ -400,7 +399,7 @@ handle_group_replay_request (void *cls,
400 } 399 }
401 } 400 }
402 401
403 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 402 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
404} 403}
405 404
406 405
@@ -516,7 +515,7 @@ handle_member_join_decision (void *cls,
516 //if (GNUNET_YES != is_admitted) 515 //if (GNUNET_YES != is_admitted)
517 // GNUNET_MULTICAST_member_part (mem); 516 // GNUNET_MULTICAST_member_part (mem);
518 517
519 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 518 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
520} 519}
521 520
522 521
@@ -693,7 +692,7 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
693} 692}
694 693
695 694
696void 695static void
697origin_connect (struct GNUNET_MULTICAST_Origin *orig); 696origin_connect (struct GNUNET_MULTICAST_Origin *orig);
698 697
699 698
@@ -707,9 +706,9 @@ origin_reconnect (void *cls)
707/** 706/**
708 * Origin client disconnected from service. 707 * Origin client disconnected from service.
709 * 708 *
710 * Reconnect after backoff period.= 709 * Reconnect after backoff period.
711 */ 710 */
712void 711static void
713origin_disconnected (void *cls, enum GNUNET_MQ_Error error) 712origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
714{ 713{
715 struct GNUNET_MULTICAST_Origin *orig = cls; 714 struct GNUNET_MULTICAST_Origin *orig = cls;
@@ -724,17 +723,17 @@ origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
724 grp->mq = NULL; 723 grp->mq = NULL;
725 } 724 }
726 725
727 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff, 726 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
728 &origin_reconnect, 727 &origin_reconnect,
729 orig); 728 orig);
730 grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff); 729 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
731} 730}
732 731
733 732
734/** 733/**
735 * Connect to service as origin. 734 * Connect to service as origin.
736 */ 735 */
737void 736static void
738origin_connect (struct GNUNET_MULTICAST_Origin *orig) 737origin_connect (struct GNUNET_MULTICAST_Origin *orig)
739{ 738{
740 struct GNUNET_MULTICAST_Group *grp = &orig->grp; 739 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
@@ -770,11 +769,7 @@ origin_connect (struct GNUNET_MULTICAST_Origin *orig)
770 769
771 grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", 770 grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
772 handlers, origin_disconnected, orig); 771 handlers, origin_disconnected, orig);
773 if (NULL == grp->mq) 772 GNUNET_assert (NULL != grp->mq);
774 {
775 GNUNET_break (0);
776 return;
777 }
778 GNUNET_MQ_send_copy (grp->mq, grp->connect_env); 773 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
779} 774}
780 775
@@ -902,7 +897,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
902 "%p OriginTransmitNotify() returned error or invalid message size.\n", 897 "%p OriginTransmitNotify() returned error or invalid message size.\n",
903 orig); 898 orig);
904 /* FIXME: handle error */ 899 /* FIXME: handle error */
905 GNUNET_free (env); 900 GNUNET_MQ_discard (env);
906 return; 901 return;
907 } 902 }
908 903
@@ -910,7 +905,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
910 { 905 {
911 LOG (GNUNET_ERROR_TYPE_DEBUG, 906 LOG (GNUNET_ERROR_TYPE_DEBUG,
912 "%p OriginTransmitNotify() - transmission paused.\n", orig); 907 "%p OriginTransmitNotify() - transmission paused.\n", orig);
913 GNUNET_free (env); 908 GNUNET_MQ_discard (env);
914 return; /* Transmission paused. */ 909 return; /* Transmission paused. */
915 } 910 }
916 911
@@ -1000,7 +995,7 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan
1000} 995}
1001 996
1002 997
1003 void 998static void
1004member_connect (struct GNUNET_MULTICAST_Member *mem); 999member_connect (struct GNUNET_MULTICAST_Member *mem);
1005 1000
1006 1001
@@ -1016,7 +1011,7 @@ member_reconnect (void *cls)
1016 * 1011 *
1017 * Reconnect after backoff period. 1012 * Reconnect after backoff period.
1018 */ 1013 */
1019void 1014static void
1020member_disconnected (void *cls, enum GNUNET_MQ_Error error) 1015member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1021{ 1016{
1022 struct GNUNET_MULTICAST_Member *mem = cls; 1017 struct GNUNET_MULTICAST_Member *mem = cls;
@@ -1028,17 +1023,17 @@ member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1028 GNUNET_MQ_destroy (grp->mq); 1023 GNUNET_MQ_destroy (grp->mq);
1029 grp->mq = NULL; 1024 grp->mq = NULL;
1030 1025
1031 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff, 1026 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1032 &member_reconnect, 1027 &member_reconnect,
1033 mem); 1028 mem);
1034 grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff); 1029 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1035} 1030}
1036 1031
1037 1032
1038/** 1033/**
1039 * Connect to service as member. 1034 * Connect to service as member.
1040 */ 1035 */
1041void 1036static void
1042member_connect (struct GNUNET_MULTICAST_Member *mem) 1037member_connect (struct GNUNET_MULTICAST_Member *mem)
1043{ 1038{
1044 struct GNUNET_MULTICAST_Group *grp = &mem->grp; 1039 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
@@ -1079,11 +1074,7 @@ member_connect (struct GNUNET_MULTICAST_Member *mem)
1079 1074
1080 grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", 1075 grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
1081 handlers, member_disconnected, mem); 1076 handlers, member_disconnected, mem);
1082 if (NULL == grp->mq) 1077 GNUNET_assert (NULL != grp->mq);
1083 {
1084 GNUNET_break (0);
1085 return;
1086 }
1087 GNUNET_MQ_send_copy (grp->mq, grp->connect_env); 1078 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1088} 1079}
1089 1080
@@ -1171,7 +1162,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1171 if (0 < join_msg_size) 1162 if (0 < join_msg_size)
1172 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); 1163 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1173 1164
1174 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 1165 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1175 grp->is_origin = GNUNET_NO; 1166 grp->is_origin = GNUNET_NO;
1176 grp->cfg = cfg; 1167 grp->cfg = cfg;
1177 1168
@@ -1326,14 +1317,14 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1326 "MemberTransmitNotify() returned error or invalid message size. " 1317 "MemberTransmitNotify() returned error or invalid message size. "
1327 "ret=%d, buf_size=%u\n", ret, buf_size); 1318 "ret=%d, buf_size=%u\n", ret, buf_size);
1328 /* FIXME: handle error */ 1319 /* FIXME: handle error */
1329 GNUNET_free (req); 1320 GNUNET_MQ_discard (env);
1330 return; 1321 return;
1331 } 1322 }
1332 1323
1333 if (GNUNET_NO == ret && 0 == buf_size) 1324 if (GNUNET_NO == ret && 0 == buf_size)
1334 { 1325 {
1335 /* Transmission paused. */ 1326 /* Transmission paused. */
1336 GNUNET_free (req); 1327 GNUNET_MQ_discard (env);
1337 return; 1328 return;
1338 } 1329 }
1339 1330