aboutsummaryrefslogtreecommitdiff
path: root/src/multicast
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-09-27 21:04:34 +0000
committerGabor X Toth <*@tg-x.net>2015-09-27 21:04:34 +0000
commitcab1b047ddcac497e14515fc11f097c4aac8ee27 (patch)
tree7f4e14a8c77d76bef07cb4bbf6b94adcce44d53c /src/multicast
parent51f530b98232f7a9947f29008d161ed0d8e23af4 (diff)
downloadgnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.tar.gz
gnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.zip
multicast/psyc/social: message acks & scheduling
Diffstat (limited to 'src/multicast')
-rw-r--r--src/multicast/gnunet-service-multicast.c77
-rw-r--r--src/multicast/multicast_api.c88
-rw-r--r--src/multicast/test_multicast.c23
3 files changed, 163 insertions, 25 deletions
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index 9ebfa66da..4d2868669 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -181,6 +181,11 @@ struct Channel
181 int8_t join_status; 181 int8_t join_status;
182 182
183 /** 183 /**
184 * Number of messages waiting to be sent to CADET.
185 */
186 uint8_t msgs_pending;
187
188 /**
184 * Channel direction. 189 * Channel direction.
185 * @see enum ChannelDirection 190 * @see enum ChannelDirection
186 */ 191 */
@@ -619,8 +624,10 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
619/** 624/**
620 * Send message to all origin and member clients connected to the group. 625 * Send message to all origin and member clients connected to the group.
621 * 626 *
622 * @param grp The group to send @a msg to. 627 * @param pub_key_hash
623 * @param msg Message to send. 628 * H(key_pub) of the group.
629 * @param msg
630 * Message to send.
624 */ 631 */
625static int 632static int
626client_send_all (struct GNUNET_HashCode *pub_key_hash, 633client_send_all (struct GNUNET_HashCode *pub_key_hash,
@@ -660,8 +667,10 @@ client_send_random (struct GNUNET_HashCode *pub_key_hash,
660/** 667/**
661 * Send message to all origin clients connected to the group. 668 * Send message to all origin clients connected to the group.
662 * 669 *
663 * @param grp The group to send @a msg to. 670 * @param pub_key_hash
664 * @param msg Message to send. 671 * H(key_pub) of the group.
672 * @param msg
673 * Message to send.
665 */ 674 */
666static int 675static int
667client_send_origin (struct GNUNET_HashCode *pub_key_hash, 676client_send_origin (struct GNUNET_HashCode *pub_key_hash,
@@ -676,6 +685,33 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash,
676 685
677 686
678/** 687/**
688 * Send fragment acknowledgement to all clients of the channel.
689 *
690 * @param pub_key_hash
691 * H(key_pub) of the group.
692 */
693static void
694client_send_ack (struct GNUNET_HashCode *pub_key_hash)
695{
696 static struct GNUNET_MessageHeader *msg = NULL;
697 if (NULL == msg)
698 {
699 msg = GNUNET_malloc (sizeof (*msg));
700 msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
701 msg->size = htons (sizeof (*msg));
702 }
703 client_send_all (pub_key_hash, msg);
704}
705
706
707struct CadetTransmitClosure
708{
709 struct Channel *chn;
710 const struct GNUNET_MessageHeader *msg;
711};
712
713
714/**
679 * CADET is ready to transmit a message. 715 * CADET is ready to transmit a message.
680 */ 716 */
681size_t 717size_t
@@ -686,10 +722,21 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
686 /* FIXME: connection closed */ 722 /* FIXME: connection closed */
687 return 0; 723 return 0;
688 } 724 }
689 const struct GNUNET_MessageHeader *msg = cls; 725 struct CadetTransmitClosure *tcls = cls;
690 uint16_t msg_size = ntohs (msg->size); 726 struct Channel *chn = tcls->chn;
727 uint16_t msg_size = ntohs (tcls->msg->size);
691 GNUNET_assert (msg_size <= buf_size); 728 GNUNET_assert (msg_size <= buf_size);
692 memcpy (buf, msg, msg_size); 729 memcpy (buf, tcls->msg, msg_size);
730 GNUNET_free (tcls);
731
732 if (0 == chn->msgs_pending)
733 {
734 GNUNET_break (0);
735 }
736 else if (0 == --chn->msgs_pending)
737 {
738 client_send_ack (&chn->group_key_hash);
739 }
693 return msg_size; 740 return msg_size;
694} 741}
695 742
@@ -703,6 +750,11 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
703static void 750static void
704cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg) 751cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
705{ 752{
753 struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
754 tcls->chn = chn;
755 tcls->msg = msg;
756
757 chn->msgs_pending++;
706 chn->tmit_handle 758 chn->tmit_handle
707 = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO, 759 = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
708 GNUNET_TIME_UNIT_FOREVER_REL, 760 GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1132,7 +1184,10 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
1132 } 1184 }
1133 1185
1134 client_send_all (&grp->pub_key_hash, &out->header); 1186 client_send_all (&grp->pub_key_hash, &out->header);
1135 cadet_send_children (&grp->pub_key_hash, &out->header); 1187 if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
1188 {
1189 client_send_ack (&grp->pub_key_hash);
1190 }
1136 GNUNET_free (out); 1191 GNUNET_free (out);
1137 1192
1138 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1193 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1174,11 +1229,13 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
1174 GNUNET_assert (0); 1229 GNUNET_assert (0);
1175 } 1230 }
1176 1231
1232 uint8_t send_ack = GNUNET_YES;
1177 if (0 == client_send_origin (&grp->pub_key_hash, &out->header)) 1233 if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1178 { /* No local origins, send to remote origin */ 1234 { /* No local origins, send to remote origin */
1179 if (NULL != mem->origin_channel) 1235 if (NULL != mem->origin_channel)
1180 { 1236 {
1181 cadet_send_channel (mem->origin_channel, &out->header); 1237 cadet_send_channel (mem->origin_channel, &out->header);
1238 send_ack = GNUNET_NO;
1182 } 1239 }
1183 else 1240 else
1184 { 1241 {
@@ -1188,6 +1245,10 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
1188 return; 1245 return;
1189 } 1246 }
1190 } 1247 }
1248 if (GNUNET_YES == send_ack)
1249 {
1250 client_send_ack (&grp->pub_key_hash);
1251 }
1191 GNUNET_free (out); 1252 GNUNET_free (out);
1192 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1253 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1193} 1254}
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 117a0efe2..774a8bf70 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -102,6 +102,11 @@ struct GNUNET_MULTICAST_Group
102 uint8_t in_transmit; 102 uint8_t in_transmit;
103 103
104 /** 104 /**
105 * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
106 */
107 uint8_t acks_pending;
108
109 /**
105 * Is this the origin or a member? 110 * Is this the origin or a member?
106 */ 111 */
107 uint8_t is_origin; 112 uint8_t is_origin;
@@ -185,6 +190,13 @@ struct GNUNET_MULTICAST_MemberReplayHandle
185}; 190};
186 191
187 192
193static void
194origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
195
196static void
197member_to_origin (struct GNUNET_MULTICAST_Member *mem);
198
199
188/** 200/**
189 * Send first message to the service after connecting. 201 * Send first message to the service after connecting.
190 */ 202 */
@@ -274,6 +286,38 @@ group_recv_message (void *cls,
274 286
275 287
276/** 288/**
289 * Receive message/request fragment acknowledgement from service.
290 */
291static void
292group_recv_fragment_ack (void *cls,
293 struct GNUNET_CLIENT_MANAGER_Connection *client,
294 const struct GNUNET_MessageHeader *msg)
295{
296 struct GNUNET_MULTICAST_Group *
297 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
298
299 LOG (GNUNET_ERROR_TYPE_DEBUG,
300 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
301 grp, grp->in_transmit, grp->acks_pending);
302
303 if (0 == grp->acks_pending)
304 {
305 LOG (GNUNET_ERROR_TYPE_DEBUG,
306 "%p Ignoring extraneous fragment ACK.\n", grp);
307 return;
308 }
309 grp->acks_pending--;
310
311 if (GNUNET_YES != grp->in_transmit)
312 return;
313
314 if (GNUNET_YES == grp->is_origin)
315 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
316 else
317 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
318}
319
320/**
277 * Origin receives uniquest request from a member. 321 * Origin receives uniquest request from a member.
278 */ 322 */
279static void 323static void
@@ -447,6 +491,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
447 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 491 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
448 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, 492 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
449 493
494 { group_recv_fragment_ack, NULL,
495 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
496 sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
497
450 { group_recv_join_request, NULL, 498 { group_recv_join_request, NULL,
451 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 499 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
452 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, 500 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -470,6 +518,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
470 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 518 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
471 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, 519 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
472 520
521 { group_recv_fragment_ack, NULL,
522 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
523 sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
524
473 { group_recv_join_request, NULL, 525 { group_recv_join_request, NULL,
474 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 526 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
475 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, 527 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -577,6 +629,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
577 memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); 629 memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
578 630
579 GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); 631 GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
632 GNUNET_free (hdcsn);
580 GNUNET_free (join); 633 GNUNET_free (join);
581 return NULL; 634 return NULL;
582} 635}
@@ -774,9 +827,10 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
774static void 827static void
775origin_to_all (struct GNUNET_MULTICAST_Origin *orig) 828origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
776{ 829{
777 LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n"); 830 LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
778 struct GNUNET_MULTICAST_Group *grp = &orig->grp; 831 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
779 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; 832 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
833 GNUNET_assert (GNUNET_YES == grp->in_transmit);
780 834
781 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 835 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
782 struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); 836 struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
@@ -786,7 +840,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
786 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) 840 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
787 { 841 {
788 LOG (GNUNET_ERROR_TYPE_ERROR, 842 LOG (GNUNET_ERROR_TYPE_ERROR,
789 "OriginTransmitNotify() returned error or invalid message size.\n"); 843 "%p OriginTransmitNotify() returned error or invalid message size.\n",
844 orig);
790 /* FIXME: handle error */ 845 /* FIXME: handle error */
791 GNUNET_free (msg); 846 GNUNET_free (msg);
792 return; 847 return;
@@ -794,6 +849,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
794 849
795 if (GNUNET_NO == ret && 0 == buf_size) 850 if (GNUNET_NO == ret && 0 == buf_size)
796 { 851 {
852 LOG (GNUNET_ERROR_TYPE_DEBUG,
853 "%p OriginTransmitNotify() - transmission paused.\n", orig);
797 GNUNET_free (msg); 854 GNUNET_free (msg);
798 return; /* Transmission paused. */ 855 return; /* Transmission paused. */
799 } 856 }
@@ -805,7 +862,12 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
805 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); 862 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
806 tmit->fragment_offset += sizeof (*msg) + buf_size; 863 tmit->fragment_offset += sizeof (*msg) + buf_size;
807 864
865 grp->acks_pending++;
808 GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); 866 GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
867 GNUNET_free (msg);
868
869 if (GNUNET_YES == ret)
870 grp->in_transmit = GNUNET_NO;
809} 871}
810 872
811 873
@@ -834,11 +896,10 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
834 GNUNET_MULTICAST_OriginTransmitNotify notify, 896 GNUNET_MULTICAST_OriginTransmitNotify notify,
835 void *notify_cls) 897 void *notify_cls)
836{ 898{
837/* FIXME 899 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
838 if (GNUNET_YES == orig->grp.in_transmit) 900 if (GNUNET_YES == grp->in_transmit)
839 return NULL; 901 return NULL;
840 orig->grp.in_transmit = GNUNET_YES; 902 grp->in_transmit = GNUNET_YES;
841*/
842 903
843 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; 904 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
844 tmit->origin = orig; 905 tmit->origin = orig;
@@ -861,6 +922,9 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
861void 922void
862GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) 923GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
863{ 924{
925 struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
926 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
927 return;
864 origin_to_all (th->origin); 928 origin_to_all (th->origin);
865} 929}
866 930
@@ -874,6 +938,7 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan
874void 938void
875GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) 939GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
876{ 940{
941 th->origin->grp.in_transmit = GNUNET_NO;
877} 942}
878 943
879 944
@@ -1094,6 +1159,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1094 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); 1159 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1095 struct GNUNET_MULTICAST_Group *grp = &mem->grp; 1160 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1096 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; 1161 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1162 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1097 1163
1098 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 1164 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1099 struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); 1165 struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
@@ -1124,6 +1190,10 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1124 tmit->fragment_offset += sizeof (*req) + buf_size; 1190 tmit->fragment_offset += sizeof (*req) + buf_size;
1125 1191
1126 GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); 1192 GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
1193 GNUNET_free (req);
1194
1195 if (GNUNET_YES == ret)
1196 grp->in_transmit = GNUNET_NO;
1127} 1197}
1128 1198
1129 1199
@@ -1147,11 +1217,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1147 GNUNET_MULTICAST_MemberTransmitNotify notify, 1217 GNUNET_MULTICAST_MemberTransmitNotify notify,
1148 void *notify_cls) 1218 void *notify_cls)
1149{ 1219{
1150/* FIXME
1151 if (GNUNET_YES == mem->grp.in_transmit) 1220 if (GNUNET_YES == mem->grp.in_transmit)
1152 return NULL; 1221 return NULL;
1153 mem->grp.in_transmit = GNUNET_YES; 1222 mem->grp.in_transmit = GNUNET_YES;
1154*/
1155 1223
1156 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; 1224 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1157 tmit->member = mem; 1225 tmit->member = mem;
@@ -1173,6 +1241,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1173void 1241void
1174GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) 1242GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1175{ 1243{
1244 struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1245 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1246 return;
1176 member_to_origin (th->member); 1247 member_to_origin (th->member);
1177} 1248}
1178 1249
@@ -1186,6 +1257,7 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmit
1186void 1257void
1187GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) 1258GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1188{ 1259{
1260 th->member->grp.in_transmit = GNUNET_NO;
1189} 1261}
1190 1262
1191 1263
diff --git a/src/multicast/test_multicast.c b/src/multicast/test_multicast.c
index a5946f3fd..f35c2885a 100644
--- a/src/multicast/test_multicast.c
+++ b/src/multicast/test_multicast.c
@@ -183,7 +183,7 @@ tmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
183 struct TransmitClosure *tmit = cls; 183 struct TransmitClosure *tmit = cls;
184 if (NULL != tmit->orig_tmit) 184 if (NULL != tmit->orig_tmit)
185 GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit); 185 GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit);
186 else 186 else if (NULL != tmit->mem_tmit)
187 GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit); 187 GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit);
188} 188}
189 189
@@ -453,14 +453,15 @@ member_to_origin ()
453 *tmit = (struct TransmitClosure) {}; 453 *tmit = (struct TransmitClosure) {};
454 tmit->data[0] = "abc def"; 454 tmit->data[0] = "abc def";
455 tmit->data[1] = "ghi jkl mno"; 455 tmit->data[1] = "ghi jkl mno";
456 tmit->data_delay[1] = 1; 456 tmit->data_delay[1] = 2;
457 tmit->data[2] = "pqr stuw xyz"; 457 tmit->data[2] = "pqr stuw xyz";
458 tmit->data_count = 3; 458 tmit->data_count = 3;
459 459
460 origin_cls.n = 0; 460 origin_cls.n = 0;
461 origin_cls.msgs_expected = 1; 461 origin_cls.msgs_expected = 1;
462 462
463 GNUNET_MULTICAST_member_to_origin (member, 1, tmit_notify, tmit); 463 tmit->mem_tmit = GNUNET_MULTICAST_member_to_origin (member, 1,
464 tmit_notify, tmit);
464} 465}
465 466
466 467
@@ -533,15 +534,19 @@ origin_to_all ()
533 struct TransmitClosure *tmit = &tmit_cls; 534 struct TransmitClosure *tmit = &tmit_cls;
534 *tmit = (struct TransmitClosure) {}; 535 *tmit = (struct TransmitClosure) {};
535 tmit->data[0] = "ABC DEF"; 536 tmit->data[0] = "ABC DEF";
536 tmit->data[1] = "GHI JKL MNO"; 537 tmit->data[1] = GNUNET_malloc (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD + 1);
537 tmit->data_delay[1] = 1; 538 for (uint16_t i = 0; i < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; i++)
538 tmit->data[2] = "PQR STUW XYZ"; 539 tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
539 tmit->data_count = 3; 540 tmit->data[2] = "GHI JKL MNO";
541 tmit->data_delay[2] = 2;
542 tmit->data[3] = "PQR STUW XYZ";
543 tmit->data_count = 4;
540 544
541 origin_cls.n = member_cls.n = 0; 545 origin_cls.n = member_cls.n = 0;
542 origin_cls.msgs_expected = member_cls.msgs_expected = 1; 546 origin_cls.msgs_expected = member_cls.msgs_expected = tmit->data_count;
543 547
544 GNUNET_MULTICAST_origin_to_all (origin, 1, 1, tmit_notify, tmit); 548 tmit->orig_tmit = GNUNET_MULTICAST_origin_to_all (origin, 1, 1,
549 tmit_notify, tmit);
545} 550}
546 551
547 552