aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-09-27 21:04:34 +0000
committerGabor X Toth <*@tg-x.net>2015-09-27 21:04:34 +0000
commitcab1b047ddcac497e14515fc11f097c4aac8ee27 (patch)
tree7f4e14a8c77d76bef07cb4bbf6b94adcce44d53c
parent51f530b98232f7a9947f29008d161ed0d8e23af4 (diff)
downloadgnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.tar.gz
gnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.zip
multicast/psyc/social: message acks & scheduling
-rw-r--r--src/include/gnunet_protocols.h11
-rw-r--r--src/multicast/gnunet-service-multicast.c77
-rw-r--r--src/multicast/multicast_api.c88
-rw-r--r--src/multicast/test_multicast.c23
-rw-r--r--src/psyc/gnunet-service-psyc.c82
-rw-r--r--src/psyc/test_psyc.c47
-rw-r--r--src/psycstore/psyc_util_lib.c41
-rw-r--r--src/social/social_api.c12
-rw-r--r--src/util/client_manager.c2
9 files changed, 279 insertions, 104 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 052bd5d03..a06c8ad67 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2439,19 +2439,24 @@ extern "C"
2439#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758 2439#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758
2440 2440
2441/** 2441/**
2442 * C->S: Acknowledgement of a message or request fragment for the client.
2443 */
2444#define GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK 759
2445
2446/**
2442 * C<->S<->T: Replay request from a group member to another member. 2447 * C<->S<->T: Replay request from a group member to another member.
2443 */ 2448 */
2444#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 759 2449#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 760
2445 2450
2446/** 2451/**
2447 * C<->S<->T: Replay response from a group member to another member. 2452 * C<->S<->T: Replay response from a group member to another member.
2448 */ 2453 */
2449#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 763 2454#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 761
2450 2455
2451/** 2456/**
2452 * C<->S: End of replay response. 2457 * C<->S: End of replay response.
2453 */ 2458 */
2454#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 764 2459#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 762
2455 2460
2456 2461
2457 2462
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index 9ebfa66da..4d2868669 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -181,6 +181,11 @@ struct Channel
181 int8_t join_status; 181 int8_t join_status;
182 182
183 /** 183 /**
184 * Number of messages waiting to be sent to CADET.
185 */
186 uint8_t msgs_pending;
187
188 /**
184 * Channel direction. 189 * Channel direction.
185 * @see enum ChannelDirection 190 * @see enum ChannelDirection
186 */ 191 */
@@ -619,8 +624,10 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
619/** 624/**
620 * Send message to all origin and member clients connected to the group. 625 * Send message to all origin and member clients connected to the group.
621 * 626 *
622 * @param grp The group to send @a msg to. 627 * @param pub_key_hash
623 * @param msg Message to send. 628 * H(key_pub) of the group.
629 * @param msg
630 * Message to send.
624 */ 631 */
625static int 632static int
626client_send_all (struct GNUNET_HashCode *pub_key_hash, 633client_send_all (struct GNUNET_HashCode *pub_key_hash,
@@ -660,8 +667,10 @@ client_send_random (struct GNUNET_HashCode *pub_key_hash,
660/** 667/**
661 * Send message to all origin clients connected to the group. 668 * Send message to all origin clients connected to the group.
662 * 669 *
663 * @param grp The group to send @a msg to. 670 * @param pub_key_hash
664 * @param msg Message to send. 671 * H(key_pub) of the group.
672 * @param msg
673 * Message to send.
665 */ 674 */
666static int 675static int
667client_send_origin (struct GNUNET_HashCode *pub_key_hash, 676client_send_origin (struct GNUNET_HashCode *pub_key_hash,
@@ -676,6 +685,33 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash,
676 685
677 686
678/** 687/**
688 * Send fragment acknowledgement to all clients of the channel.
689 *
690 * @param pub_key_hash
691 * H(key_pub) of the group.
692 */
693static void
694client_send_ack (struct GNUNET_HashCode *pub_key_hash)
695{
696 static struct GNUNET_MessageHeader *msg = NULL;
697 if (NULL == msg)
698 {
699 msg = GNUNET_malloc (sizeof (*msg));
700 msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
701 msg->size = htons (sizeof (*msg));
702 }
703 client_send_all (pub_key_hash, msg);
704}
705
706
707struct CadetTransmitClosure
708{
709 struct Channel *chn;
710 const struct GNUNET_MessageHeader *msg;
711};
712
713
714/**
679 * CADET is ready to transmit a message. 715 * CADET is ready to transmit a message.
680 */ 716 */
681size_t 717size_t
@@ -686,10 +722,21 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
686 /* FIXME: connection closed */ 722 /* FIXME: connection closed */
687 return 0; 723 return 0;
688 } 724 }
689 const struct GNUNET_MessageHeader *msg = cls; 725 struct CadetTransmitClosure *tcls = cls;
690 uint16_t msg_size = ntohs (msg->size); 726 struct Channel *chn = tcls->chn;
727 uint16_t msg_size = ntohs (tcls->msg->size);
691 GNUNET_assert (msg_size <= buf_size); 728 GNUNET_assert (msg_size <= buf_size);
692 memcpy (buf, msg, msg_size); 729 memcpy (buf, tcls->msg, msg_size);
730 GNUNET_free (tcls);
731
732 if (0 == chn->msgs_pending)
733 {
734 GNUNET_break (0);
735 }
736 else if (0 == --chn->msgs_pending)
737 {
738 client_send_ack (&chn->group_key_hash);
739 }
693 return msg_size; 740 return msg_size;
694} 741}
695 742
@@ -703,6 +750,11 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
703static void 750static void
704cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg) 751cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
705{ 752{
753 struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
754 tcls->chn = chn;
755 tcls->msg = msg;
756
757 chn->msgs_pending++;
706 chn->tmit_handle 758 chn->tmit_handle
707 = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO, 759 = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
708 GNUNET_TIME_UNIT_FOREVER_REL, 760 GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1132,7 +1184,10 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
1132 } 1184 }
1133 1185
1134 client_send_all (&grp->pub_key_hash, &out->header); 1186 client_send_all (&grp->pub_key_hash, &out->header);
1135 cadet_send_children (&grp->pub_key_hash, &out->header); 1187 if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
1188 {
1189 client_send_ack (&grp->pub_key_hash);
1190 }
1136 GNUNET_free (out); 1191 GNUNET_free (out);
1137 1192
1138 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1193 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1174,11 +1229,13 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
1174 GNUNET_assert (0); 1229 GNUNET_assert (0);
1175 } 1230 }
1176 1231
1232 uint8_t send_ack = GNUNET_YES;
1177 if (0 == client_send_origin (&grp->pub_key_hash, &out->header)) 1233 if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1178 { /* No local origins, send to remote origin */ 1234 { /* No local origins, send to remote origin */
1179 if (NULL != mem->origin_channel) 1235 if (NULL != mem->origin_channel)
1180 { 1236 {
1181 cadet_send_channel (mem->origin_channel, &out->header); 1237 cadet_send_channel (mem->origin_channel, &out->header);
1238 send_ack = GNUNET_NO;
1182 } 1239 }
1183 else 1240 else
1184 { 1241 {
@@ -1188,6 +1245,10 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
1188 return; 1245 return;
1189 } 1246 }
1190 } 1247 }
1248 if (GNUNET_YES == send_ack)
1249 {
1250 client_send_ack (&grp->pub_key_hash);
1251 }
1191 GNUNET_free (out); 1252 GNUNET_free (out);
1192 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1253 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1193} 1254}
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 117a0efe2..774a8bf70 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -102,6 +102,11 @@ struct GNUNET_MULTICAST_Group
102 uint8_t in_transmit; 102 uint8_t in_transmit;
103 103
104 /** 104 /**
105 * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
106 */
107 uint8_t acks_pending;
108
109 /**
105 * Is this the origin or a member? 110 * Is this the origin or a member?
106 */ 111 */
107 uint8_t is_origin; 112 uint8_t is_origin;
@@ -185,6 +190,13 @@ struct GNUNET_MULTICAST_MemberReplayHandle
185}; 190};
186 191
187 192
193static void
194origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
195
196static void
197member_to_origin (struct GNUNET_MULTICAST_Member *mem);
198
199
188/** 200/**
189 * Send first message to the service after connecting. 201 * Send first message to the service after connecting.
190 */ 202 */
@@ -274,6 +286,38 @@ group_recv_message (void *cls,
274 286
275 287
276/** 288/**
289 * Receive message/request fragment acknowledgement from service.
290 */
291static void
292group_recv_fragment_ack (void *cls,
293 struct GNUNET_CLIENT_MANAGER_Connection *client,
294 const struct GNUNET_MessageHeader *msg)
295{
296 struct GNUNET_MULTICAST_Group *
297 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
298
299 LOG (GNUNET_ERROR_TYPE_DEBUG,
300 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
301 grp, grp->in_transmit, grp->acks_pending);
302
303 if (0 == grp->acks_pending)
304 {
305 LOG (GNUNET_ERROR_TYPE_DEBUG,
306 "%p Ignoring extraneous fragment ACK.\n", grp);
307 return;
308 }
309 grp->acks_pending--;
310
311 if (GNUNET_YES != grp->in_transmit)
312 return;
313
314 if (GNUNET_YES == grp->is_origin)
315 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
316 else
317 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
318}
319
320/**
277 * Origin receives uniquest request from a member. 321 * Origin receives uniquest request from a member.
278 */ 322 */
279static void 323static void
@@ -447,6 +491,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
447 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 491 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
448 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, 492 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
449 493
494 { group_recv_fragment_ack, NULL,
495 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
496 sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
497
450 { group_recv_join_request, NULL, 498 { group_recv_join_request, NULL,
451 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 499 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
452 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, 500 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -470,6 +518,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
470 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 518 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
471 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, 519 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
472 520
521 { group_recv_fragment_ack, NULL,
522 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
523 sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
524
473 { group_recv_join_request, NULL, 525 { group_recv_join_request, NULL,
474 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 526 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
475 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, 527 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -577,6 +629,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
577 memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); 629 memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
578 630
579 GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); 631 GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
632 GNUNET_free (hdcsn);
580 GNUNET_free (join); 633 GNUNET_free (join);
581 return NULL; 634 return NULL;
582} 635}
@@ -774,9 +827,10 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
774static void 827static void
775origin_to_all (struct GNUNET_MULTICAST_Origin *orig) 828origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
776{ 829{
777 LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n"); 830 LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
778 struct GNUNET_MULTICAST_Group *grp = &orig->grp; 831 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
779 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; 832 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
833 GNUNET_assert (GNUNET_YES == grp->in_transmit);
780 834
781 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 835 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
782 struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); 836 struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
@@ -786,7 +840,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
786 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) 840 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
787 { 841 {
788 LOG (GNUNET_ERROR_TYPE_ERROR, 842 LOG (GNUNET_ERROR_TYPE_ERROR,
789 "OriginTransmitNotify() returned error or invalid message size.\n"); 843 "%p OriginTransmitNotify() returned error or invalid message size.\n",
844 orig);
790 /* FIXME: handle error */ 845 /* FIXME: handle error */
791 GNUNET_free (msg); 846 GNUNET_free (msg);
792 return; 847 return;
@@ -794,6 +849,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
794 849
795 if (GNUNET_NO == ret && 0 == buf_size) 850 if (GNUNET_NO == ret && 0 == buf_size)
796 { 851 {
852 LOG (GNUNET_ERROR_TYPE_DEBUG,
853 "%p OriginTransmitNotify() - transmission paused.\n", orig);
797 GNUNET_free (msg); 854 GNUNET_free (msg);
798 return; /* Transmission paused. */ 855 return; /* Transmission paused. */
799 } 856 }
@@ -805,7 +862,12 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
805 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); 862 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
806 tmit->fragment_offset += sizeof (*msg) + buf_size; 863 tmit->fragment_offset += sizeof (*msg) + buf_size;
807 864
865 grp->acks_pending++;
808 GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); 866 GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
867 GNUNET_free (msg);
868
869 if (GNUNET_YES == ret)
870 grp->in_transmit = GNUNET_NO;
809} 871}
810 872
811 873
@@ -834,11 +896,10 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
834 GNUNET_MULTICAST_OriginTransmitNotify notify, 896 GNUNET_MULTICAST_OriginTransmitNotify notify,
835 void *notify_cls) 897 void *notify_cls)
836{ 898{
837/* FIXME 899 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
838 if (GNUNET_YES == orig->grp.in_transmit) 900 if (GNUNET_YES == grp->in_transmit)
839 return NULL; 901 return NULL;
840 orig->grp.in_transmit = GNUNET_YES; 902 grp->in_transmit = GNUNET_YES;
841*/
842 903
843 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; 904 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
844 tmit->origin = orig; 905 tmit->origin = orig;
@@ -861,6 +922,9 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
861void 922void
862GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) 923GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
863{ 924{
925 struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
926 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
927 return;
864 origin_to_all (th->origin); 928 origin_to_all (th->origin);
865} 929}
866 930
@@ -874,6 +938,7 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan
874void 938void
875GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) 939GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
876{ 940{
941 th->origin->grp.in_transmit = GNUNET_NO;
877} 942}
878 943
879 944
@@ -1094,6 +1159,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1094 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); 1159 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1095 struct GNUNET_MULTICAST_Group *grp = &mem->grp; 1160 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1096 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; 1161 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1162 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1097 1163
1098 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 1164 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1099 struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); 1165 struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
@@ -1124,6 +1190,10 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1124 tmit->fragment_offset += sizeof (*req) + buf_size; 1190 tmit->fragment_offset += sizeof (*req) + buf_size;
1125 1191
1126 GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); 1192 GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
1193 GNUNET_free (req);
1194
1195 if (GNUNET_YES == ret)
1196 grp->in_transmit = GNUNET_NO;
1127} 1197}
1128 1198
1129 1199
@@ -1147,11 +1217,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1147 GNUNET_MULTICAST_MemberTransmitNotify notify, 1217 GNUNET_MULTICAST_MemberTransmitNotify notify,
1148 void *notify_cls) 1218 void *notify_cls)
1149{ 1219{
1150/* FIXME
1151 if (GNUNET_YES == mem->grp.in_transmit) 1220 if (GNUNET_YES == mem->grp.in_transmit)
1152 return NULL; 1221 return NULL;
1153 mem->grp.in_transmit = GNUNET_YES; 1222 mem->grp.in_transmit = GNUNET_YES;
1154*/
1155 1223
1156 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; 1224 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1157 tmit->member = mem; 1225 tmit->member = mem;
@@ -1173,6 +1241,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1173void 1241void
1174GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) 1242GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1175{ 1243{
1244 struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1245 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1246 return;
1176 member_to_origin (th->member); 1247 member_to_origin (th->member);
1177} 1248}
1178 1249
@@ -1186,6 +1257,7 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmit
1186void 1257void
1187GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) 1258GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1188{ 1259{
1260 th->member->grp.in_transmit = GNUNET_NO;
1189} 1261}
1190 1262
1191 1263
diff --git a/src/multicast/test_multicast.c b/src/multicast/test_multicast.c
index a5946f3fd..f35c2885a 100644
--- a/src/multicast/test_multicast.c
+++ b/src/multicast/test_multicast.c
@@ -183,7 +183,7 @@ tmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
183 struct TransmitClosure *tmit = cls; 183 struct TransmitClosure *tmit = cls;
184 if (NULL != tmit->orig_tmit) 184 if (NULL != tmit->orig_tmit)
185 GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit); 185 GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit);
186 else 186 else if (NULL != tmit->mem_tmit)
187 GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit); 187 GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit);
188} 188}
189 189
@@ -453,14 +453,15 @@ member_to_origin ()
453 *tmit = (struct TransmitClosure) {}; 453 *tmit = (struct TransmitClosure) {};
454 tmit->data[0] = "abc def"; 454 tmit->data[0] = "abc def";
455 tmit->data[1] = "ghi jkl mno"; 455 tmit->data[1] = "ghi jkl mno";
456 tmit->data_delay[1] = 1; 456 tmit->data_delay[1] = 2;
457 tmit->data[2] = "pqr stuw xyz"; 457 tmit->data[2] = "pqr stuw xyz";
458 tmit->data_count = 3; 458 tmit->data_count = 3;
459 459
460 origin_cls.n = 0; 460 origin_cls.n = 0;
461 origin_cls.msgs_expected = 1; 461 origin_cls.msgs_expected = 1;
462 462
463 GNUNET_MULTICAST_member_to_origin (member, 1, tmit_notify, tmit); 463 tmit->mem_tmit = GNUNET_MULTICAST_member_to_origin (member, 1,
464 tmit_notify, tmit);
464} 465}
465 466
466 467
@@ -533,15 +534,19 @@ origin_to_all ()
533 struct TransmitClosure *tmit = &tmit_cls; 534 struct TransmitClosure *tmit = &tmit_cls;
534 *tmit = (struct TransmitClosure) {}; 535 *tmit = (struct TransmitClosure) {};
535 tmit->data[0] = "ABC DEF"; 536 tmit->data[0] = "ABC DEF";
536 tmit->data[1] = "GHI JKL MNO"; 537 tmit->data[1] = GNUNET_malloc (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD + 1);
537 tmit->data_delay[1] = 1; 538 for (uint16_t i = 0; i < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; i++)
538 tmit->data[2] = "PQR STUW XYZ"; 539 tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
539 tmit->data_count = 3; 540 tmit->data[2] = "GHI JKL MNO";
541 tmit->data_delay[2] = 2;
542 tmit->data[3] = "PQR STUW XYZ";
543 tmit->data_count = 4;
540 544
541 origin_cls.n = member_cls.n = 0; 545 origin_cls.n = member_cls.n = 0;
542 origin_cls.msgs_expected = member_cls.msgs_expected = 1; 546 origin_cls.msgs_expected = member_cls.msgs_expected = tmit->data_count;
543 547
544 GNUNET_MULTICAST_origin_to_all (origin, 1, 1, tmit_notify, tmit); 548 tmit->orig_tmit = GNUNET_MULTICAST_origin_to_all (origin, 1, 1,
549 tmit_notify, tmit);
545} 550}
546 551
547 552
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index ceb0ca95b..e710b41a5 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -107,17 +107,6 @@ struct TransmitMessage
107 */ 107 */
108 uint16_t last_ptype; 108 uint16_t last_ptype;
109 109
110 /**
111 * @see enum MessageState
112 */
113 uint8_t state;
114
115 /**
116 * Whether a message ACK has already been sent to the client.
117 * #GNUNET_YES or #GNUNET_NO
118 */
119 uint8_t ack_sent;
120
121 /* Followed by message */ 110 /* Followed by message */
122}; 111};
123 112
@@ -281,11 +270,6 @@ struct Channel
281 uint32_t tmit_mod_value_size; 270 uint32_t tmit_mod_value_size;
282 271
283 /** 272 /**
284 * @see enum MessageState
285 */
286 uint8_t tmit_state;
287
288 /**
289 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? 273 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
290 */ 274 */
291 uint8_t is_master; 275 uint8_t is_master;
@@ -438,6 +422,15 @@ static uint64_t
438message_queue_drop (struct Channel *chn); 422message_queue_drop (struct Channel *chn);
439 423
440 424
425static void
426schedule_transmit_message (void *cls,
427 const struct GNUNET_SCHEDULER_TaskContext *tc)
428{
429 struct Channel *chn = cls;
430 transmit_message (chn);
431}
432
433
441/** 434/**
442 * Task run during shutdown. 435 * Task run during shutdown.
443 * 436 *
@@ -1145,8 +1138,8 @@ fragment_queue_insert (struct Channel *chn,
1145 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1138 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1146 "%p Header of message %" PRIu64 " is NOT complete yet: " 1139 "%p Header of message %" PRIu64 " is NOT complete yet: "
1147 "%" PRIu64 " != %" PRIu64 "\n", 1140 "%" PRIu64 " != %" PRIu64 "\n",
1148 chn, GNUNET_ntohll (mmsg->message_id), frag_offset, 1141 chn, GNUNET_ntohll (mmsg->message_id),
1149 fragq->header_size); 1142 frag_offset, fragq->header_size);
1150 } 1143 }
1151 } 1144 }
1152 1145
@@ -1159,8 +1152,8 @@ fragment_queue_insert (struct Channel *chn,
1159 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1152 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1160 "%p Message %" PRIu64 " is NOT complete yet: " 1153 "%p Message %" PRIu64 " is NOT complete yet: "
1161 "%" PRIu64 " != %" PRIu64 "\n", 1154 "%" PRIu64 " != %" PRIu64 "\n",
1162 chn, GNUNET_ntohll (mmsg->message_id), frag_offset, 1155 chn, GNUNET_ntohll (mmsg->message_id),
1163 fragq->size); 1156 frag_offset, fragq->size);
1164 break; 1157 break;
1165 1158
1166 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: 1159 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
@@ -1486,17 +1479,26 @@ mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg
1486 uint16_t size = ntohs (mmsg->header.size); 1479 uint16_t size = ntohs (mmsg->header.size);
1487 1480
1488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1489 "%p Received multicast message of size %u.\n", 1482 "%p Received multicast message of size %u. "
1490 chn, size); 1483 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1484 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1485 chn, size,
1486 GNUNET_ntohll (mmsg->fragment_id),
1487 GNUNET_ntohll (mmsg->message_id),
1488 GNUNET_ntohll (mmsg->fragment_offset),
1489 GNUNET_ntohll (mmsg->flags));
1491 1490
1492 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, 1491 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1493 &store_recv_fragment_store_result, chn); 1492 &store_recv_fragment_store_result, chn);
1494 1493
1495 uint16_t first_ptype = 0, last_ptype = 0; 1494 uint16_t first_ptype = 0, last_ptype = 0;
1496 if (GNUNET_SYSERR 1495 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1497 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), 1496 (const char *) &mmsg[1],
1498 (const char *) &mmsg[1], 1497 &first_ptype, &last_ptype);
1499 &first_ptype, &last_ptype)) 1498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1499 "%p Message check result %d, first part type %u, last part type %u\n",
1500 chn, check, first_ptype, last_ptype);
1501 if (GNUNET_SYSERR == check)
1500 { 1502 {
1501 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1503 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1502 "%p Dropping incoming multicast message with invalid parts.\n", 1504 "%p Dropping incoming multicast message with invalid parts.\n",
@@ -1505,10 +1507,6 @@ mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg
1505 return; 1507 return;
1506 } 1508 }
1507 1509
1508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1509 "Message parts: first: type %u, last: type %u\n",
1510 first_ptype, last_ptype);
1511
1512 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); 1510 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1513 message_queue_run (chn); 1511 message_queue_run (chn);
1514} 1512}
@@ -1965,6 +1963,8 @@ transmit_notify (void *cls, size_t *data_size, void *data)
1965 { 1963 {
1966 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1964 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967 "%p transmit_notify: nothing to send.\n", chn); 1965 "%p transmit_notify: nothing to send.\n", chn);
1966 if (NULL != tmit_msg && *data_size < tmit_msg->size)
1967 GNUNET_break (0);
1968 *data_size = 0; 1968 *data_size = 0;
1969 return GNUNET_NO; 1969 return GNUNET_NO;
1970 } 1970 }
@@ -1975,9 +1975,13 @@ transmit_notify (void *cls, size_t *data_size, void *data)
1975 *data_size = tmit_msg->size; 1975 *data_size = tmit_msg->size;
1976 memcpy (data, &tmit_msg[1], *data_size); 1976 memcpy (data, &tmit_msg[1], *data_size);
1977 1977
1978 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES; 1978 int ret
1979 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1980 ? GNUNET_NO
1981 : GNUNET_YES;
1979 1982
1980 if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent) 1983 /* FIXME: handle disconnecting clients */
1984 if (NULL != tmit_msg->client)
1981 send_message_ack (chn, tmit_msg->client); 1985 send_message_ack (chn, tmit_msg->client);
1982 1986
1983 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); 1987 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
@@ -1985,7 +1989,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
1985 1989
1986 if (NULL != chn->tmit_head) 1990 if (NULL != chn->tmit_head)
1987 { 1991 {
1988 transmit_message (chn); 1992 GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
1989 } 1993 }
1990 else if (GNUNET_YES == chn->is_disconnected 1994 else if (GNUNET_YES == chn->is_disconnected
1991 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) 1995 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
@@ -2037,10 +2041,12 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data)
2037static void 2041static void
2038master_transmit_message (struct Master *mst) 2042master_transmit_message (struct Master *mst)
2039{ 2043{
2044 if (NULL == mst->chn.tmit_head)
2045 return;
2040 if (NULL == mst->tmit_handle) 2046 if (NULL == mst->tmit_handle)
2041 { 2047 {
2042 mst->tmit_handle 2048 mst->tmit_handle
2043 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, 2049 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->chn.tmit_head->id,
2044 mst->max_group_generation, 2050 mst->max_group_generation,
2045 master_transmit_notify, mst); 2051 master_transmit_notify, mst);
2046 } 2052 }
@@ -2057,10 +2063,12 @@ master_transmit_message (struct Master *mst)
2057static void 2063static void
2058slave_transmit_message (struct Slave *slv) 2064slave_transmit_message (struct Slave *slv)
2059{ 2065{
2066 if (NULL == slv->chn.tmit_head)
2067 return;
2060 if (NULL == slv->tmit_handle) 2068 if (NULL == slv->tmit_handle)
2061 { 2069 {
2062 slv->tmit_handle 2070 slv->tmit_handle
2063 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, 2071 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2064 slave_transmit_notify, slv); 2072 slave_transmit_notify, slv);
2065 } 2073 }
2066 else 2074 else
@@ -2090,6 +2098,9 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2090 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) 2098 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2091 { 2099 {
2092 tmit_msg->id = ++mst->max_message_id; 2100 tmit_msg->id = ++mst->max_message_id;
2101 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2102 "%p master_queue_message: message_id=%" PRIu64 "\n",
2103 mst, tmit_msg->id);
2093 struct GNUNET_PSYC_MessageMethod *pmeth 2104 struct GNUNET_PSYC_MessageMethod *pmeth
2094 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; 2105 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2095 2106
@@ -2159,7 +2170,6 @@ queue_message (struct Channel *chn,
2159 memcpy (&tmit_msg[1], data, data_size); 2170 memcpy (&tmit_msg[1], data, data_size);
2160 tmit_msg->client = client; 2171 tmit_msg->client = client;
2161 tmit_msg->size = data_size; 2172 tmit_msg->size = data_size;
2162 tmit_msg->state = chn->tmit_state;
2163 tmit_msg->first_ptype = first_ptype; 2173 tmit_msg->first_ptype = first_ptype;
2164 tmit_msg->last_ptype = last_ptype; 2174 tmit_msg->last_ptype = last_ptype;
2165 2175
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index e2e6cfc87..1ce9074d5 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -225,7 +225,8 @@ master_message_part_cb (void *cls,
225 if (NULL == msg) 225 if (NULL == msg)
226 { 226 {
227 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 227 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
228 "Error while receiving message %" PRIu64 "\n", message_id); 228 "Test #%d: Error while master is receiving part of message #%" PRIu64 ".\n",
229 test, message_id);
229 return; 230 return;
230 } 231 }
231 232
@@ -243,7 +244,8 @@ master_message_part_cb (void *cls,
243 if (GNUNET_PSYC_MESSAGE_REQUEST != flags) 244 if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
244 { 245 {
245 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 246 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
246 "Unexpected request flags: %x" PRIu32 "\n", flags); 247 "Test #%d: Unexpected request flags: %x" PRIu32 "\n",
248 test, flags);
247 GNUNET_assert (0); 249 GNUNET_assert (0);
248 return; 250 return;
249 } 251 }
@@ -297,7 +299,8 @@ slave_message_part_cb (void *cls,
297 if (NULL == msg) 299 if (NULL == msg)
298 { 300 {
299 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 301 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
300 "Error while receiving message " PRIu64 "\n", message_id); 302 "Test #%d: Error while slave is receiving part of message #%" PRIu64 ".\n",
303 test, message_id);
301 return; 304 return;
302 } 305 }
303 306
@@ -322,7 +325,7 @@ slave_message_part_cb (void *cls,
322 { 325 {
323 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 326 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
324 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", 327 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
325 flags); 328 test, flags);
326 GNUNET_assert (0); 329 GNUNET_assert (0);
327 return; 330 return;
328 } 331 }
@@ -575,9 +578,9 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
575 578
576 uint16_t size = strlen (tmit->data[tmit->n]); 579 uint16_t size = strlen (tmit->data[tmit->n]);
577 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 580 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578 "Transmit notify data: %u bytes available, " 581 "Test #%d: Transmit notify data: %u bytes available, "
579 "processing fragment %u/%u (size %u).\n", 582 "processing fragment %u/%u (size %u).\n",
580 *data_size, tmit->n + 1, tmit->data_count, size); 583 test, *data_size, tmit->n + 1, tmit->data_count, size);
581 if (*data_size < size) 584 if (*data_size < size)
582 { 585 {
583 *data_size = 0; 586 *data_size = 0;
@@ -587,7 +590,8 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
587 590
588 if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) 591 if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
589 { 592 {
590 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); 593 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594 "Test #%d: Transmission paused.\n", test);
591 tmit->paused = GNUNET_YES; 595 tmit->paused = GNUNET_YES;
592 GNUNET_SCHEDULER_add_delayed ( 596 GNUNET_SCHEDULER_add_delayed (
593 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 597 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
@@ -611,9 +615,9 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
611{ 615{
612 struct TransmitClosure *tmit = cls; 616 struct TransmitClosure *tmit = cls;
613 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
614 "Transmit notify modifier: %lu bytes available, " 618 "Test #%d: Transmit notify modifier: %lu bytes available, "
615 "%u modifiers left to process.\n", 619 "%u modifiers left to process.\n",
616 *data_size, GNUNET_ENV_environment_get_count (tmit->env)); 620 test, *data_size, GNUNET_ENV_environment_get_count (tmit->env));
617 621
618 uint16_t name_size = 0; 622 uint16_t name_size = 0;
619 size_t value_size = 0; 623 size_t value_size = 0;
@@ -688,9 +692,9 @@ slave_join ();
688void 692void
689slave_transmit () 693slave_transmit ()
690{ 694{
691
692 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
693 test = TEST_SLAVE_TRANSMIT; 695 test = TEST_SLAVE_TRANSMIT;
696 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
697 "Test #%d: Slave sending request to master.\n", test);
694 698
695 tmit = GNUNET_new (struct TransmitClosure); 699 tmit = GNUNET_new (struct TransmitClosure);
696 tmit->env = GNUNET_ENV_environment_create (); 700 tmit->env = GNUNET_ENV_environment_create ();
@@ -772,7 +776,7 @@ join_decision_cb (void *cls,
772 const struct GNUNET_PSYC_Message *join_msg) 776 const struct GNUNET_PSYC_Message *join_msg)
773{ 777{
774 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 778 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
775 "Slave got join decision: %d\n", is_admitted); 779 "Test #%d: Slave got join decision: %d\n", test, is_admitted);
776 780
777 switch (test) 781 switch (test)
778 { 782 {
@@ -804,8 +808,8 @@ join_request_cb (void *cls,
804 struct GNUNET_HashCode slave_key_hash; 808 struct GNUNET_HashCode slave_key_hash;
805 GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); 809 GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
806 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 810 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
807 "Got join request #%u from %s.\n", 811 "Test #%d: Got join request #%u from %s.\n",
808 join_req_count, GNUNET_h2s (&slave_key_hash)); 812 test, join_req_count, GNUNET_h2s (&slave_key_hash));
809 813
810 /* Reject first request */ 814 /* Reject first request */
811 int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; 815 int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
@@ -817,8 +821,8 @@ static void
817slave_connect_cb (void *cls, int result, uint64_t max_message_id) 821slave_connect_cb (void *cls, int result, uint64_t max_message_id)
818{ 822{
819 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 823 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
820 "Slave connected: %d, max_message_id: %" PRIu64 "\n", 824 "Test #%d: Slave connected: %d, max_message_id: %" PRIu64 "\n",
821 result, max_message_id); 825 test, result, max_message_id);
822 GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test); 826 GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test);
823 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); 827 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
824} 828}
@@ -827,8 +831,8 @@ slave_connect_cb (void *cls, int result, uint64_t max_message_id)
827static void 831static void
828slave_join (int t) 832slave_join (int t)
829{ 833{
830 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
831 test = t; 834 test = t;
835 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Joining slave.\n");
832 836
833 struct GNUNET_PeerIdentity origin = this_peer; 837 struct GNUNET_PeerIdentity origin = this_peer;
834 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); 838 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
@@ -852,8 +856,9 @@ slave_join (int t)
852void 856void
853master_transmit () 857master_transmit ()
854{ 858{
855 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
856 test = TEST_MASTER_TRANSMIT; 859 test = TEST_MASTER_TRANSMIT;
860 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
861 "Test #%d: Master sending message to all.\n", test);
857 end_count = 0; 862 end_count = 0;
858 863
859 uint32_t i, j; 864 uint32_t i, j;
@@ -907,8 +912,8 @@ void
907master_start_cb (void *cls, int result, uint64_t max_message_id) 912master_start_cb (void *cls, int result, uint64_t max_message_id)
908{ 913{
909 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 914 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
910 "Master started: %d, max_message_id: %" PRIu64 "\n", 915 "Test #%d: Master started: %d, max_message_id: %" PRIu64 "\n",
911 result, max_message_id); 916 test, result, max_message_id);
912 GNUNET_assert (TEST_MASTER_START == test); 917 GNUNET_assert (TEST_MASTER_START == test);
913 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); 918 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
914 slave_join (TEST_SLAVE_JOIN_REJECT); 919 slave_join (TEST_SLAVE_JOIN_REJECT);
@@ -918,8 +923,8 @@ master_start_cb (void *cls, int result, uint64_t max_message_id)
918void 923void
919master_start () 924master_start ()
920{ 925{
921 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
922 test = TEST_MASTER_START; 926 test = TEST_MASTER_START;
927 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Starting master.\n", test);
923 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, 928 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
924 &master_start_cb, &join_request_cb, 929 &master_start_cb, &join_request_cb,
925 &master_message_cb, &master_message_part_cb, 930 &master_message_cb, &master_message_part_cb,
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c
index 80e84f29c..9e6b1b770 100644
--- a/src/psycstore/psyc_util_lib.c
+++ b/src/psycstore/psyc_util_lib.c
@@ -101,6 +101,12 @@ struct GNUNET_PSYC_TransmitHandle
101 * Are we currently transmitting a message? 101 * Are we currently transmitting a message?
102 */ 102 */
103 uint8_t in_transmit; 103 uint8_t in_transmit;
104
105 /**
106 * Notify callback is currently being called.
107 */
108 uint8_t in_notify;
109
104}; 110};
105 111
106 112
@@ -334,20 +340,20 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
334 * Transmission handle. 340 * Transmission handle.
335 * @param msg 341 * @param msg
336 * Message part, or NULL. 342 * Message part, or NULL.
337 * @param end 343 * @param tmit_now
338 * End of message? 344 * Transmit message now, or wait for buffer to fill up?
339 * #GNUNET_YES or #GNUNET_NO. 345 * #GNUNET_YES or #GNUNET_NO.
340 */ 346 */
341static void 347static void
342transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, 348transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
343 const struct GNUNET_MessageHeader *msg, 349 const struct GNUNET_MessageHeader *msg,
344 uint8_t end) 350 uint8_t tmit_now)
345{ 351{
346 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; 352 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
347 353
348 LOG (GNUNET_ERROR_TYPE_DEBUG, 354 LOG (GNUNET_ERROR_TYPE_DEBUG,
349 "Queueing message part of type %u and size %u (end: %u)).\n", 355 "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
350 NULL != msg ? ntohs (msg->type) : 0, size, end); 356 NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
351 357
352 if (NULL != tmit->msg) 358 if (NULL != tmit->msg)
353 { 359 {
@@ -380,7 +386,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
380 } 386 }
381 387
382 if (NULL != tmit->msg 388 if (NULL != tmit->msg
383 && (GNUNET_YES == end 389 && (GNUNET_YES == tmit_now
384 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 390 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
385 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) 391 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
386 { 392 {
@@ -391,9 +397,6 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
391 tmit->msg = NULL; 397 tmit->msg = NULL;
392 tmit->acks_pending++; 398 tmit->acks_pending++;
393 } 399 }
394
395 if (GNUNET_YES == end)
396 tmit->in_transmit = GNUNET_NO;
397} 400}
398 401
399 402
@@ -414,7 +417,9 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
414 if (NULL != tmit->notify_data) 417 if (NULL != tmit->notify_data)
415 { 418 {
416 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; 419 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
420 tmit->in_notify = GNUNET_YES;
417 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); 421 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
422 tmit->in_notify = GNUNET_NO;
418 } 423 }
419 LOG (GNUNET_ERROR_TYPE_DEBUG, 424 LOG (GNUNET_ERROR_TYPE_DEBUG,
420 "transmit_data (ret: %d, size: %u): %.*s\n", 425 "transmit_data (ret: %d, size: %u): %.*s\n",
@@ -442,6 +447,7 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
442 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 447 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443 msg->size = htons (sizeof (*msg)); 448 msg->size = htons (sizeof (*msg));
444 transmit_queue_insert (tmit, msg, GNUNET_YES); 449 transmit_queue_insert (tmit, msg, GNUNET_YES);
450 tmit->in_transmit = GNUNET_NO;
445 return; 451 return;
446 } 452 }
447 453
@@ -458,6 +464,8 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
458 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); 464 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
459 msg->size = htons (sizeof (*msg)); 465 msg->size = htons (sizeof (*msg));
460 transmit_queue_insert (tmit, msg, GNUNET_YES); 466 transmit_queue_insert (tmit, msg, GNUNET_YES);
467 /* FIXME: wait for ACK before setting in_transmit to no */
468 tmit->in_transmit = GNUNET_NO;
461 } 469 }
462} 470}
463 471
@@ -489,8 +497,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
489 { 497 {
490 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; 498 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
491 data_size = max_data_size; 499 data_size = max_data_size;
500 tmit->in_notify = GNUNET_YES;
492 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], 501 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
493 &mod->oper, &mod->value_size); 502 &mod->oper, &mod->value_size);
503 tmit->in_notify = GNUNET_NO;
494 } 504 }
495 505
496 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; 506 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
@@ -520,8 +530,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
520 { 530 {
521 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; 531 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
522 data_size = max_data_size; 532 data_size = max_data_size;
533 tmit->in_notify = GNUNET_YES;
523 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, 534 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
524 &data_size, &msg[1], NULL, NULL); 535 &data_size, &msg[1], NULL, NULL);
536 tmit->in_notify = GNUNET_NO;
525 } 537 }
526 tmit->mod_value_remaining -= data_size; 538 tmit->mod_value_remaining -= data_size;
527 LOG (GNUNET_ERROR_TYPE_DEBUG, 539 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -558,8 +570,8 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
558 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; 570 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
559 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 571 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
560 msg->size = htons (sizeof (*msg)); 572 msg->size = htons (sizeof (*msg));
561
562 transmit_queue_insert (tmit, msg, GNUNET_YES); 573 transmit_queue_insert (tmit, msg, GNUNET_YES);
574 tmit->in_transmit = GNUNET_NO;
563 return; 575 return;
564 } 576 }
565 577
@@ -748,6 +760,9 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
748void 760void
749GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit) 761GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
750{ 762{
763 if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
764 return;
765
751 if (0 == tmit->acks_pending) 766 if (0 == tmit->acks_pending)
752 { 767 {
753 tmit->paused = GNUNET_NO; 768 tmit->paused = GNUNET_NO;
@@ -800,13 +815,11 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
800 { 815 {
801 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: 816 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
802 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: 817 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
803 if (GNUNET_NO == tmit->paused) 818 transmit_mod (tmit);
804 transmit_mod (tmit);
805 break; 819 break;
806 820
807 case GNUNET_PSYC_MESSAGE_STATE_DATA: 821 case GNUNET_PSYC_MESSAGE_STATE_DATA:
808 if (GNUNET_NO == tmit->paused) 822 transmit_data (tmit);
809 transmit_data (tmit);
810 break; 823 break;
811 824
812 case GNUNET_PSYC_MESSAGE_STATE_END: 825 case GNUNET_PSYC_MESSAGE_STATE_END:
diff --git a/src/social/social_api.c b/src/social/social_api.c
index 17e5a3bfc..4aed3755e 100644
--- a/src/social/social_api.c
+++ b/src/social/social_api.c
@@ -1837,8 +1837,10 @@ GNUNET_SOCIAL_host_announce (struct GNUNET_SOCIAL_Host *hst,
1837{ 1837{
1838 if (GNUNET_OK == 1838 if (GNUNET_OK ==
1839 GNUNET_PSYC_transmit_message (hst->plc.tmit, method_name, env, 1839 GNUNET_PSYC_transmit_message (hst->plc.tmit, method_name, env,
1840 NULL, notify_data, notify_data_cls, flags)); 1840 NULL, notify_data, notify_data_cls, flags))
1841 return (struct GNUNET_SOCIAL_Announcement *) hst->plc.tmit; 1841 return (struct GNUNET_SOCIAL_Announcement *) hst->plc.tmit;
1842 else
1843 return NULL;
1842} 1844}
1843 1845
1844 1846
@@ -2168,8 +2170,10 @@ GNUNET_SOCIAL_guest_talk (struct GNUNET_SOCIAL_Guest *gst,
2168 2170
2169 if (GNUNET_OK == 2171 if (GNUNET_OK ==
2170 GNUNET_PSYC_transmit_message (plc->tmit, method_name, env, 2172 GNUNET_PSYC_transmit_message (plc->tmit, method_name, env,
2171 NULL, notify_data, notify_data_cls, flags)); 2173 NULL, notify_data, notify_data_cls, flags))
2172 return (struct GNUNET_SOCIAL_TalkRequest *) plc->tmit; 2174 return (struct GNUNET_SOCIAL_TalkRequest *) plc->tmit;
2175 else
2176 return NULL;
2173} 2177}
2174 2178
2175 2179
diff --git a/src/util/client_manager.c b/src/util/client_manager.c
index 2fd52705e..80ba43ff1 100644
--- a/src/util/client_manager.c
+++ b/src/util/client_manager.c
@@ -328,7 +328,7 @@ transmit_next (struct GNUNET_CLIENT_MANAGER_Connection *mgr)
328 328
329 mgr->client_tmit 329 mgr->client_tmit
330 = GNUNET_CLIENT_notify_transmit_ready (mgr->client, 330 = GNUNET_CLIENT_notify_transmit_ready (mgr->client,
331 ntohs (mgr->tmit_head->msg->size), 331 GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
332 GNUNET_TIME_UNIT_FOREVER_REL, 332 GNUNET_TIME_UNIT_FOREVER_REL,
333 GNUNET_NO, 333 GNUNET_NO,
334 &send_next_message, 334 &send_next_message,