aboutsummaryrefslogtreecommitdiff
path: root/src/multicast/multicast_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-09-27 21:04:34 +0000
committerGabor X Toth <*@tg-x.net>2015-09-27 21:04:34 +0000
commitcab1b047ddcac497e14515fc11f097c4aac8ee27 (patch)
tree7f4e14a8c77d76bef07cb4bbf6b94adcce44d53c /src/multicast/multicast_api.c
parent51f530b98232f7a9947f29008d161ed0d8e23af4 (diff)
downloadgnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.tar.gz
gnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.zip
multicast/psyc/social: message acks & scheduling
Diffstat (limited to 'src/multicast/multicast_api.c')
-rw-r--r--src/multicast/multicast_api.c88
1 files changed, 80 insertions, 8 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 117a0efe2..774a8bf70 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -102,6 +102,11 @@ struct GNUNET_MULTICAST_Group
102 uint8_t in_transmit; 102 uint8_t in_transmit;
103 103
104 /** 104 /**
105 * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
106 */
107 uint8_t acks_pending;
108
109 /**
105 * Is this the origin or a member? 110 * Is this the origin or a member?
106 */ 111 */
107 uint8_t is_origin; 112 uint8_t is_origin;
@@ -185,6 +190,13 @@ struct GNUNET_MULTICAST_MemberReplayHandle
185}; 190};
186 191
187 192
193static void
194origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
195
196static void
197member_to_origin (struct GNUNET_MULTICAST_Member *mem);
198
199
188/** 200/**
189 * Send first message to the service after connecting. 201 * Send first message to the service after connecting.
190 */ 202 */
@@ -274,6 +286,38 @@ group_recv_message (void *cls,
274 286
275 287
276/** 288/**
289 * Receive message/request fragment acknowledgement from service.
290 */
291static void
292group_recv_fragment_ack (void *cls,
293 struct GNUNET_CLIENT_MANAGER_Connection *client,
294 const struct GNUNET_MessageHeader *msg)
295{
296 struct GNUNET_MULTICAST_Group *
297 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
298
299 LOG (GNUNET_ERROR_TYPE_DEBUG,
300 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
301 grp, grp->in_transmit, grp->acks_pending);
302
303 if (0 == grp->acks_pending)
304 {
305 LOG (GNUNET_ERROR_TYPE_DEBUG,
306 "%p Ignoring extraneous fragment ACK.\n", grp);
307 return;
308 }
309 grp->acks_pending--;
310
311 if (GNUNET_YES != grp->in_transmit)
312 return;
313
314 if (GNUNET_YES == grp->is_origin)
315 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
316 else
317 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
318}
319
320/**
277 * Origin receives uniquest request from a member. 321 * Origin receives uniquest request from a member.
278 */ 322 */
279static void 323static void
@@ -447,6 +491,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
447 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 491 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
448 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, 492 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
449 493
494 { group_recv_fragment_ack, NULL,
495 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
496 sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
497
450 { group_recv_join_request, NULL, 498 { group_recv_join_request, NULL,
451 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 499 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
452 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, 500 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -470,6 +518,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
470 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 518 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
471 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, 519 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
472 520
521 { group_recv_fragment_ack, NULL,
522 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
523 sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
524
473 { group_recv_join_request, NULL, 525 { group_recv_join_request, NULL,
474 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 526 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
475 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, 527 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -577,6 +629,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
577 memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); 629 memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
578 630
579 GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); 631 GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
632 GNUNET_free (hdcsn);
580 GNUNET_free (join); 633 GNUNET_free (join);
581 return NULL; 634 return NULL;
582} 635}
@@ -774,9 +827,10 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
774static void 827static void
775origin_to_all (struct GNUNET_MULTICAST_Origin *orig) 828origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
776{ 829{
777 LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n"); 830 LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
778 struct GNUNET_MULTICAST_Group *grp = &orig->grp; 831 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
779 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; 832 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
833 GNUNET_assert (GNUNET_YES == grp->in_transmit);
780 834
781 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 835 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
782 struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); 836 struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
@@ -786,7 +840,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
786 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) 840 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
787 { 841 {
788 LOG (GNUNET_ERROR_TYPE_ERROR, 842 LOG (GNUNET_ERROR_TYPE_ERROR,
789 "OriginTransmitNotify() returned error or invalid message size.\n"); 843 "%p OriginTransmitNotify() returned error or invalid message size.\n",
844 orig);
790 /* FIXME: handle error */ 845 /* FIXME: handle error */
791 GNUNET_free (msg); 846 GNUNET_free (msg);
792 return; 847 return;
@@ -794,6 +849,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
794 849
795 if (GNUNET_NO == ret && 0 == buf_size) 850 if (GNUNET_NO == ret && 0 == buf_size)
796 { 851 {
852 LOG (GNUNET_ERROR_TYPE_DEBUG,
853 "%p OriginTransmitNotify() - transmission paused.\n", orig);
797 GNUNET_free (msg); 854 GNUNET_free (msg);
798 return; /* Transmission paused. */ 855 return; /* Transmission paused. */
799 } 856 }
@@ -805,7 +862,12 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
805 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); 862 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
806 tmit->fragment_offset += sizeof (*msg) + buf_size; 863 tmit->fragment_offset += sizeof (*msg) + buf_size;
807 864
865 grp->acks_pending++;
808 GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); 866 GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
867 GNUNET_free (msg);
868
869 if (GNUNET_YES == ret)
870 grp->in_transmit = GNUNET_NO;
809} 871}
810 872
811 873
@@ -834,11 +896,10 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
834 GNUNET_MULTICAST_OriginTransmitNotify notify, 896 GNUNET_MULTICAST_OriginTransmitNotify notify,
835 void *notify_cls) 897 void *notify_cls)
836{ 898{
837/* FIXME 899 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
838 if (GNUNET_YES == orig->grp.in_transmit) 900 if (GNUNET_YES == grp->in_transmit)
839 return NULL; 901 return NULL;
840 orig->grp.in_transmit = GNUNET_YES; 902 grp->in_transmit = GNUNET_YES;
841*/
842 903
843 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; 904 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
844 tmit->origin = orig; 905 tmit->origin = orig;
@@ -861,6 +922,9 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
861void 922void
862GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) 923GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
863{ 924{
925 struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
926 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
927 return;
864 origin_to_all (th->origin); 928 origin_to_all (th->origin);
865} 929}
866 930
@@ -874,6 +938,7 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan
874void 938void
875GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) 939GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
876{ 940{
941 th->origin->grp.in_transmit = GNUNET_NO;
877} 942}
878 943
879 944
@@ -1094,6 +1159,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1094 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); 1159 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1095 struct GNUNET_MULTICAST_Group *grp = &mem->grp; 1160 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1096 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; 1161 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1162 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1097 1163
1098 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 1164 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1099 struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); 1165 struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
@@ -1124,6 +1190,10 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1124 tmit->fragment_offset += sizeof (*req) + buf_size; 1190 tmit->fragment_offset += sizeof (*req) + buf_size;
1125 1191
1126 GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); 1192 GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
1193 GNUNET_free (req);
1194
1195 if (GNUNET_YES == ret)
1196 grp->in_transmit = GNUNET_NO;
1127} 1197}
1128 1198
1129 1199
@@ -1147,11 +1217,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1147 GNUNET_MULTICAST_MemberTransmitNotify notify, 1217 GNUNET_MULTICAST_MemberTransmitNotify notify,
1148 void *notify_cls) 1218 void *notify_cls)
1149{ 1219{
1150/* FIXME
1151 if (GNUNET_YES == mem->grp.in_transmit) 1220 if (GNUNET_YES == mem->grp.in_transmit)
1152 return NULL; 1221 return NULL;
1153 mem->grp.in_transmit = GNUNET_YES; 1222 mem->grp.in_transmit = GNUNET_YES;
1154*/
1155 1223
1156 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; 1224 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1157 tmit->member = mem; 1225 tmit->member = mem;
@@ -1173,6 +1241,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1173void 1241void
1174GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) 1242GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1175{ 1243{
1244 struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1245 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1246 return;
1176 member_to_origin (th->member); 1247 member_to_origin (th->member);
1177} 1248}
1178 1249
@@ -1186,6 +1257,7 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmit
1186void 1257void
1187GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) 1258GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1188{ 1259{
1260 th->member->grp.in_transmit = GNUNET_NO;
1189} 1261}
1190 1262
1191 1263