diff options
author | Gabor X Toth <*@tg-x.net> | 2015-09-27 21:04:34 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-09-27 21:04:34 +0000 |
commit | cab1b047ddcac497e14515fc11f097c4aac8ee27 (patch) | |
tree | 7f4e14a8c77d76bef07cb4bbf6b94adcce44d53c | |
parent | 51f530b98232f7a9947f29008d161ed0d8e23af4 (diff) | |
download | gnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.tar.gz gnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.zip |
multicast/psyc/social: message acks & scheduling
-rw-r--r-- | src/include/gnunet_protocols.h | 11 | ||||
-rw-r--r-- | src/multicast/gnunet-service-multicast.c | 77 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 88 | ||||
-rw-r--r-- | src/multicast/test_multicast.c | 23 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 82 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 47 | ||||
-rw-r--r-- | src/psycstore/psyc_util_lib.c | 41 | ||||
-rw-r--r-- | src/social/social_api.c | 12 | ||||
-rw-r--r-- | src/util/client_manager.c | 2 |
9 files changed, 279 insertions, 104 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 052bd5d03..a06c8ad67 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -2439,19 +2439,24 @@ extern "C" | |||
2439 | #define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758 | 2439 | #define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758 |
2440 | 2440 | ||
2441 | /** | 2441 | /** |
2442 | * C->S: Acknowledgement of a message or request fragment for the client. | ||
2443 | */ | ||
2444 | #define GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK 759 | ||
2445 | |||
2446 | /** | ||
2442 | * C<->S<->T: Replay request from a group member to another member. | 2447 | * C<->S<->T: Replay request from a group member to another member. |
2443 | */ | 2448 | */ |
2444 | #define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 759 | 2449 | #define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 760 |
2445 | 2450 | ||
2446 | /** | 2451 | /** |
2447 | * C<->S<->T: Replay response from a group member to another member. | 2452 | * C<->S<->T: Replay response from a group member to another member. |
2448 | */ | 2453 | */ |
2449 | #define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 763 | 2454 | #define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 761 |
2450 | 2455 | ||
2451 | /** | 2456 | /** |
2452 | * C<->S: End of replay response. | 2457 | * C<->S: End of replay response. |
2453 | */ | 2458 | */ |
2454 | #define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 764 | 2459 | #define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 762 |
2455 | 2460 | ||
2456 | 2461 | ||
2457 | 2462 | ||
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index 9ebfa66da..4d2868669 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c | |||
@@ -181,6 +181,11 @@ struct Channel | |||
181 | int8_t join_status; | 181 | int8_t join_status; |
182 | 182 | ||
183 | /** | 183 | /** |
184 | * Number of messages waiting to be sent to CADET. | ||
185 | */ | ||
186 | uint8_t msgs_pending; | ||
187 | |||
188 | /** | ||
184 | * Channel direction. | 189 | * Channel direction. |
185 | * @see enum ChannelDirection | 190 | * @see enum ChannelDirection |
186 | */ | 191 | */ |
@@ -619,8 +624,10 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, | |||
619 | /** | 624 | /** |
620 | * Send message to all origin and member clients connected to the group. | 625 | * Send message to all origin and member clients connected to the group. |
621 | * | 626 | * |
622 | * @param grp The group to send @a msg to. | 627 | * @param pub_key_hash |
623 | * @param msg Message to send. | 628 | * H(key_pub) of the group. |
629 | * @param msg | ||
630 | * Message to send. | ||
624 | */ | 631 | */ |
625 | static int | 632 | static int |
626 | client_send_all (struct GNUNET_HashCode *pub_key_hash, | 633 | client_send_all (struct GNUNET_HashCode *pub_key_hash, |
@@ -660,8 +667,10 @@ client_send_random (struct GNUNET_HashCode *pub_key_hash, | |||
660 | /** | 667 | /** |
661 | * Send message to all origin clients connected to the group. | 668 | * Send message to all origin clients connected to the group. |
662 | * | 669 | * |
663 | * @param grp The group to send @a msg to. | 670 | * @param pub_key_hash |
664 | * @param msg Message to send. | 671 | * H(key_pub) of the group. |
672 | * @param msg | ||
673 | * Message to send. | ||
665 | */ | 674 | */ |
666 | static int | 675 | static int |
667 | client_send_origin (struct GNUNET_HashCode *pub_key_hash, | 676 | client_send_origin (struct GNUNET_HashCode *pub_key_hash, |
@@ -676,6 +685,33 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash, | |||
676 | 685 | ||
677 | 686 | ||
678 | /** | 687 | /** |
688 | * Send fragment acknowledgement to all clients of the channel. | ||
689 | * | ||
690 | * @param pub_key_hash | ||
691 | * H(key_pub) of the group. | ||
692 | */ | ||
693 | static void | ||
694 | client_send_ack (struct GNUNET_HashCode *pub_key_hash) | ||
695 | { | ||
696 | static struct GNUNET_MessageHeader *msg = NULL; | ||
697 | if (NULL == msg) | ||
698 | { | ||
699 | msg = GNUNET_malloc (sizeof (*msg)); | ||
700 | msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK); | ||
701 | msg->size = htons (sizeof (*msg)); | ||
702 | } | ||
703 | client_send_all (pub_key_hash, msg); | ||
704 | } | ||
705 | |||
706 | |||
707 | struct CadetTransmitClosure | ||
708 | { | ||
709 | struct Channel *chn; | ||
710 | const struct GNUNET_MessageHeader *msg; | ||
711 | }; | ||
712 | |||
713 | |||
714 | /** | ||
679 | * CADET is ready to transmit a message. | 715 | * CADET is ready to transmit a message. |
680 | */ | 716 | */ |
681 | size_t | 717 | size_t |
@@ -686,10 +722,21 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf) | |||
686 | /* FIXME: connection closed */ | 722 | /* FIXME: connection closed */ |
687 | return 0; | 723 | return 0; |
688 | } | 724 | } |
689 | const struct GNUNET_MessageHeader *msg = cls; | 725 | struct CadetTransmitClosure *tcls = cls; |
690 | uint16_t msg_size = ntohs (msg->size); | 726 | struct Channel *chn = tcls->chn; |
727 | uint16_t msg_size = ntohs (tcls->msg->size); | ||
691 | GNUNET_assert (msg_size <= buf_size); | 728 | GNUNET_assert (msg_size <= buf_size); |
692 | memcpy (buf, msg, msg_size); | 729 | memcpy (buf, tcls->msg, msg_size); |
730 | GNUNET_free (tcls); | ||
731 | |||
732 | if (0 == chn->msgs_pending) | ||
733 | { | ||
734 | GNUNET_break (0); | ||
735 | } | ||
736 | else if (0 == --chn->msgs_pending) | ||
737 | { | ||
738 | client_send_ack (&chn->group_key_hash); | ||
739 | } | ||
693 | return msg_size; | 740 | return msg_size; |
694 | } | 741 | } |
695 | 742 | ||
@@ -703,6 +750,11 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf) | |||
703 | static void | 750 | static void |
704 | cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg) | 751 | cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg) |
705 | { | 752 | { |
753 | struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls)); | ||
754 | tcls->chn = chn; | ||
755 | tcls->msg = msg; | ||
756 | |||
757 | chn->msgs_pending++; | ||
706 | chn->tmit_handle | 758 | chn->tmit_handle |
707 | = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO, | 759 | = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO, |
708 | GNUNET_TIME_UNIT_FOREVER_REL, | 760 | GNUNET_TIME_UNIT_FOREVER_REL, |
@@ -1132,7 +1184,10 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1132 | } | 1184 | } |
1133 | 1185 | ||
1134 | client_send_all (&grp->pub_key_hash, &out->header); | 1186 | client_send_all (&grp->pub_key_hash, &out->header); |
1135 | cadet_send_children (&grp->pub_key_hash, &out->header); | 1187 | if (0 == cadet_send_children (&grp->pub_key_hash, &out->header)) |
1188 | { | ||
1189 | client_send_ack (&grp->pub_key_hash); | ||
1190 | } | ||
1136 | GNUNET_free (out); | 1191 | GNUNET_free (out); |
1137 | 1192 | ||
1138 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1193 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
@@ -1174,11 +1229,13 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, | |||
1174 | GNUNET_assert (0); | 1229 | GNUNET_assert (0); |
1175 | } | 1230 | } |
1176 | 1231 | ||
1232 | uint8_t send_ack = GNUNET_YES; | ||
1177 | if (0 == client_send_origin (&grp->pub_key_hash, &out->header)) | 1233 | if (0 == client_send_origin (&grp->pub_key_hash, &out->header)) |
1178 | { /* No local origins, send to remote origin */ | 1234 | { /* No local origins, send to remote origin */ |
1179 | if (NULL != mem->origin_channel) | 1235 | if (NULL != mem->origin_channel) |
1180 | { | 1236 | { |
1181 | cadet_send_channel (mem->origin_channel, &out->header); | 1237 | cadet_send_channel (mem->origin_channel, &out->header); |
1238 | send_ack = GNUNET_NO; | ||
1182 | } | 1239 | } |
1183 | else | 1240 | else |
1184 | { | 1241 | { |
@@ -1188,6 +1245,10 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, | |||
1188 | return; | 1245 | return; |
1189 | } | 1246 | } |
1190 | } | 1247 | } |
1248 | if (GNUNET_YES == send_ack) | ||
1249 | { | ||
1250 | client_send_ack (&grp->pub_key_hash); | ||
1251 | } | ||
1191 | GNUNET_free (out); | 1252 | GNUNET_free (out); |
1192 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1253 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1193 | } | 1254 | } |
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 117a0efe2..774a8bf70 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -102,6 +102,11 @@ struct GNUNET_MULTICAST_Group | |||
102 | uint8_t in_transmit; | 102 | uint8_t in_transmit; |
103 | 103 | ||
104 | /** | 104 | /** |
105 | * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for. | ||
106 | */ | ||
107 | uint8_t acks_pending; | ||
108 | |||
109 | /** | ||
105 | * Is this the origin or a member? | 110 | * Is this the origin or a member? |
106 | */ | 111 | */ |
107 | uint8_t is_origin; | 112 | uint8_t is_origin; |
@@ -185,6 +190,13 @@ struct GNUNET_MULTICAST_MemberReplayHandle | |||
185 | }; | 190 | }; |
186 | 191 | ||
187 | 192 | ||
193 | static void | ||
194 | origin_to_all (struct GNUNET_MULTICAST_Origin *orig); | ||
195 | |||
196 | static void | ||
197 | member_to_origin (struct GNUNET_MULTICAST_Member *mem); | ||
198 | |||
199 | |||
188 | /** | 200 | /** |
189 | * Send first message to the service after connecting. | 201 | * Send first message to the service after connecting. |
190 | */ | 202 | */ |
@@ -274,6 +286,38 @@ group_recv_message (void *cls, | |||
274 | 286 | ||
275 | 287 | ||
276 | /** | 288 | /** |
289 | * Receive message/request fragment acknowledgement from service. | ||
290 | */ | ||
291 | static void | ||
292 | group_recv_fragment_ack (void *cls, | ||
293 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
294 | const struct GNUNET_MessageHeader *msg) | ||
295 | { | ||
296 | struct GNUNET_MULTICAST_Group * | ||
297 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
298 | |||
299 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
300 | "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n", | ||
301 | grp, grp->in_transmit, grp->acks_pending); | ||
302 | |||
303 | if (0 == grp->acks_pending) | ||
304 | { | ||
305 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
306 | "%p Ignoring extraneous fragment ACK.\n", grp); | ||
307 | return; | ||
308 | } | ||
309 | grp->acks_pending--; | ||
310 | |||
311 | if (GNUNET_YES != grp->in_transmit) | ||
312 | return; | ||
313 | |||
314 | if (GNUNET_YES == grp->is_origin) | ||
315 | origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp); | ||
316 | else | ||
317 | member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); | ||
318 | } | ||
319 | |||
320 | /** | ||
277 | * Origin receives uniquest request from a member. | 321 | * Origin receives uniquest request from a member. |
278 | */ | 322 | */ |
279 | static void | 323 | static void |
@@ -447,6 +491,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = | |||
447 | GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, | 491 | GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, |
448 | sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, | 492 | sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, |
449 | 493 | ||
494 | { group_recv_fragment_ack, NULL, | ||
495 | GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, | ||
496 | sizeof (struct GNUNET_MessageHeader), GNUNET_YES }, | ||
497 | |||
450 | { group_recv_join_request, NULL, | 498 | { group_recv_join_request, NULL, |
451 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, | 499 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, |
452 | sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, | 500 | sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, |
@@ -470,6 +518,10 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] = | |||
470 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, | 518 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, |
471 | sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, | 519 | sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, |
472 | 520 | ||
521 | { group_recv_fragment_ack, NULL, | ||
522 | GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, | ||
523 | sizeof (struct GNUNET_MessageHeader), GNUNET_YES }, | ||
524 | |||
473 | { group_recv_join_request, NULL, | 525 | { group_recv_join_request, NULL, |
474 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, | 526 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, |
475 | sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, | 527 | sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, |
@@ -577,6 +629,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, | |||
577 | memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); | 629 | memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); |
578 | 630 | ||
579 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); | 631 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); |
632 | GNUNET_free (hdcsn); | ||
580 | GNUNET_free (join); | 633 | GNUNET_free (join); |
581 | return NULL; | 634 | return NULL; |
582 | } | 635 | } |
@@ -774,9 +827,10 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig, | |||
774 | static void | 827 | static void |
775 | origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | 828 | origin_to_all (struct GNUNET_MULTICAST_Origin *orig) |
776 | { | 829 | { |
777 | LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n"); | 830 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig); |
778 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; | 831 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; |
779 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; | 832 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; |
833 | GNUNET_assert (GNUNET_YES == grp->in_transmit); | ||
780 | 834 | ||
781 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | 835 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; |
782 | struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); | 836 | struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); |
@@ -786,7 +840,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
786 | || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) | 840 | || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) |
787 | { | 841 | { |
788 | LOG (GNUNET_ERROR_TYPE_ERROR, | 842 | LOG (GNUNET_ERROR_TYPE_ERROR, |
789 | "OriginTransmitNotify() returned error or invalid message size.\n"); | 843 | "%p OriginTransmitNotify() returned error or invalid message size.\n", |
844 | orig); | ||
790 | /* FIXME: handle error */ | 845 | /* FIXME: handle error */ |
791 | GNUNET_free (msg); | 846 | GNUNET_free (msg); |
792 | return; | 847 | return; |
@@ -794,6 +849,8 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
794 | 849 | ||
795 | if (GNUNET_NO == ret && 0 == buf_size) | 850 | if (GNUNET_NO == ret && 0 == buf_size) |
796 | { | 851 | { |
852 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
853 | "%p OriginTransmitNotify() - transmission paused.\n", orig); | ||
797 | GNUNET_free (msg); | 854 | GNUNET_free (msg); |
798 | return; /* Transmission paused. */ | 855 | return; /* Transmission paused. */ |
799 | } | 856 | } |
@@ -805,7 +862,12 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
805 | msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); | 862 | msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); |
806 | tmit->fragment_offset += sizeof (*msg) + buf_size; | 863 | tmit->fragment_offset += sizeof (*msg) + buf_size; |
807 | 864 | ||
865 | grp->acks_pending++; | ||
808 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); | 866 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); |
867 | GNUNET_free (msg); | ||
868 | |||
869 | if (GNUNET_YES == ret) | ||
870 | grp->in_transmit = GNUNET_NO; | ||
809 | } | 871 | } |
810 | 872 | ||
811 | 873 | ||
@@ -834,11 +896,10 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, | |||
834 | GNUNET_MULTICAST_OriginTransmitNotify notify, | 896 | GNUNET_MULTICAST_OriginTransmitNotify notify, |
835 | void *notify_cls) | 897 | void *notify_cls) |
836 | { | 898 | { |
837 | /* FIXME | 899 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; |
838 | if (GNUNET_YES == orig->grp.in_transmit) | 900 | if (GNUNET_YES == grp->in_transmit) |
839 | return NULL; | 901 | return NULL; |
840 | orig->grp.in_transmit = GNUNET_YES; | 902 | grp->in_transmit = GNUNET_YES; |
841 | */ | ||
842 | 903 | ||
843 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; | 904 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; |
844 | tmit->origin = orig; | 905 | tmit->origin = orig; |
@@ -861,6 +922,9 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, | |||
861 | void | 922 | void |
862 | GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) | 923 | GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) |
863 | { | 924 | { |
925 | struct GNUNET_MULTICAST_Group *grp = &th->origin->grp; | ||
926 | if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) | ||
927 | return; | ||
864 | origin_to_all (th->origin); | 928 | origin_to_all (th->origin); |
865 | } | 929 | } |
866 | 930 | ||
@@ -874,6 +938,7 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan | |||
874 | void | 938 | void |
875 | GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) | 939 | GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) |
876 | { | 940 | { |
941 | th->origin->grp.in_transmit = GNUNET_NO; | ||
877 | } | 942 | } |
878 | 943 | ||
879 | 944 | ||
@@ -1094,6 +1159,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) | |||
1094 | LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); | 1159 | LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); |
1095 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; | 1160 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; |
1096 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; | 1161 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; |
1162 | GNUNET_assert (GNUNET_YES == grp->in_transmit); | ||
1097 | 1163 | ||
1098 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | 1164 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; |
1099 | struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); | 1165 | struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); |
@@ -1124,6 +1190,10 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) | |||
1124 | tmit->fragment_offset += sizeof (*req) + buf_size; | 1190 | tmit->fragment_offset += sizeof (*req) + buf_size; |
1125 | 1191 | ||
1126 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); | 1192 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); |
1193 | GNUNET_free (req); | ||
1194 | |||
1195 | if (GNUNET_YES == ret) | ||
1196 | grp->in_transmit = GNUNET_NO; | ||
1127 | } | 1197 | } |
1128 | 1198 | ||
1129 | 1199 | ||
@@ -1147,11 +1217,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, | |||
1147 | GNUNET_MULTICAST_MemberTransmitNotify notify, | 1217 | GNUNET_MULTICAST_MemberTransmitNotify notify, |
1148 | void *notify_cls) | 1218 | void *notify_cls) |
1149 | { | 1219 | { |
1150 | /* FIXME | ||
1151 | if (GNUNET_YES == mem->grp.in_transmit) | 1220 | if (GNUNET_YES == mem->grp.in_transmit) |
1152 | return NULL; | 1221 | return NULL; |
1153 | mem->grp.in_transmit = GNUNET_YES; | 1222 | mem->grp.in_transmit = GNUNET_YES; |
1154 | */ | ||
1155 | 1223 | ||
1156 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; | 1224 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; |
1157 | tmit->member = mem; | 1225 | tmit->member = mem; |
@@ -1173,6 +1241,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, | |||
1173 | void | 1241 | void |
1174 | GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) | 1242 | GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) |
1175 | { | 1243 | { |
1244 | struct GNUNET_MULTICAST_Group *grp = &th->member->grp; | ||
1245 | if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) | ||
1246 | return; | ||
1176 | member_to_origin (th->member); | 1247 | member_to_origin (th->member); |
1177 | } | 1248 | } |
1178 | 1249 | ||
@@ -1186,6 +1257,7 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmit | |||
1186 | void | 1257 | void |
1187 | GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) | 1258 | GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) |
1188 | { | 1259 | { |
1260 | th->member->grp.in_transmit = GNUNET_NO; | ||
1189 | } | 1261 | } |
1190 | 1262 | ||
1191 | 1263 | ||
diff --git a/src/multicast/test_multicast.c b/src/multicast/test_multicast.c index a5946f3fd..f35c2885a 100644 --- a/src/multicast/test_multicast.c +++ b/src/multicast/test_multicast.c | |||
@@ -183,7 +183,7 @@ tmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
183 | struct TransmitClosure *tmit = cls; | 183 | struct TransmitClosure *tmit = cls; |
184 | if (NULL != tmit->orig_tmit) | 184 | if (NULL != tmit->orig_tmit) |
185 | GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit); | 185 | GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit); |
186 | else | 186 | else if (NULL != tmit->mem_tmit) |
187 | GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit); | 187 | GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit); |
188 | } | 188 | } |
189 | 189 | ||
@@ -453,14 +453,15 @@ member_to_origin () | |||
453 | *tmit = (struct TransmitClosure) {}; | 453 | *tmit = (struct TransmitClosure) {}; |
454 | tmit->data[0] = "abc def"; | 454 | tmit->data[0] = "abc def"; |
455 | tmit->data[1] = "ghi jkl mno"; | 455 | tmit->data[1] = "ghi jkl mno"; |
456 | tmit->data_delay[1] = 1; | 456 | tmit->data_delay[1] = 2; |
457 | tmit->data[2] = "pqr stuw xyz"; | 457 | tmit->data[2] = "pqr stuw xyz"; |
458 | tmit->data_count = 3; | 458 | tmit->data_count = 3; |
459 | 459 | ||
460 | origin_cls.n = 0; | 460 | origin_cls.n = 0; |
461 | origin_cls.msgs_expected = 1; | 461 | origin_cls.msgs_expected = 1; |
462 | 462 | ||
463 | GNUNET_MULTICAST_member_to_origin (member, 1, tmit_notify, tmit); | 463 | tmit->mem_tmit = GNUNET_MULTICAST_member_to_origin (member, 1, |
464 | tmit_notify, tmit); | ||
464 | } | 465 | } |
465 | 466 | ||
466 | 467 | ||
@@ -533,15 +534,19 @@ origin_to_all () | |||
533 | struct TransmitClosure *tmit = &tmit_cls; | 534 | struct TransmitClosure *tmit = &tmit_cls; |
534 | *tmit = (struct TransmitClosure) {}; | 535 | *tmit = (struct TransmitClosure) {}; |
535 | tmit->data[0] = "ABC DEF"; | 536 | tmit->data[0] = "ABC DEF"; |
536 | tmit->data[1] = "GHI JKL MNO"; | 537 | tmit->data[1] = GNUNET_malloc (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD + 1); |
537 | tmit->data_delay[1] = 1; | 538 | for (uint16_t i = 0; i < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; i++) |
538 | tmit->data[2] = "PQR STUW XYZ"; | 539 | tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_'; |
539 | tmit->data_count = 3; | 540 | tmit->data[2] = "GHI JKL MNO"; |
541 | tmit->data_delay[2] = 2; | ||
542 | tmit->data[3] = "PQR STUW XYZ"; | ||
543 | tmit->data_count = 4; | ||
540 | 544 | ||
541 | origin_cls.n = member_cls.n = 0; | 545 | origin_cls.n = member_cls.n = 0; |
542 | origin_cls.msgs_expected = member_cls.msgs_expected = 1; | 546 | origin_cls.msgs_expected = member_cls.msgs_expected = tmit->data_count; |
543 | 547 | ||
544 | GNUNET_MULTICAST_origin_to_all (origin, 1, 1, tmit_notify, tmit); | 548 | tmit->orig_tmit = GNUNET_MULTICAST_origin_to_all (origin, 1, 1, |
549 | tmit_notify, tmit); | ||
545 | } | 550 | } |
546 | 551 | ||
547 | 552 | ||
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index ceb0ca95b..e710b41a5 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -107,17 +107,6 @@ struct TransmitMessage | |||
107 | */ | 107 | */ |
108 | uint16_t last_ptype; | 108 | uint16_t last_ptype; |
109 | 109 | ||
110 | /** | ||
111 | * @see enum MessageState | ||
112 | */ | ||
113 | uint8_t state; | ||
114 | |||
115 | /** | ||
116 | * Whether a message ACK has already been sent to the client. | ||
117 | * #GNUNET_YES or #GNUNET_NO | ||
118 | */ | ||
119 | uint8_t ack_sent; | ||
120 | |||
121 | /* Followed by message */ | 110 | /* Followed by message */ |
122 | }; | 111 | }; |
123 | 112 | ||
@@ -281,11 +270,6 @@ struct Channel | |||
281 | uint32_t tmit_mod_value_size; | 270 | uint32_t tmit_mod_value_size; |
282 | 271 | ||
283 | /** | 272 | /** |
284 | * @see enum MessageState | ||
285 | */ | ||
286 | uint8_t tmit_state; | ||
287 | |||
288 | /** | ||
289 | * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? | 273 | * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? |
290 | */ | 274 | */ |
291 | uint8_t is_master; | 275 | uint8_t is_master; |
@@ -438,6 +422,15 @@ static uint64_t | |||
438 | message_queue_drop (struct Channel *chn); | 422 | message_queue_drop (struct Channel *chn); |
439 | 423 | ||
440 | 424 | ||
425 | static void | ||
426 | schedule_transmit_message (void *cls, | ||
427 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
428 | { | ||
429 | struct Channel *chn = cls; | ||
430 | transmit_message (chn); | ||
431 | } | ||
432 | |||
433 | |||
441 | /** | 434 | /** |
442 | * Task run during shutdown. | 435 | * Task run during shutdown. |
443 | * | 436 | * |
@@ -1145,8 +1138,8 @@ fragment_queue_insert (struct Channel *chn, | |||
1145 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1138 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1146 | "%p Header of message %" PRIu64 " is NOT complete yet: " | 1139 | "%p Header of message %" PRIu64 " is NOT complete yet: " |
1147 | "%" PRIu64 " != %" PRIu64 "\n", | 1140 | "%" PRIu64 " != %" PRIu64 "\n", |
1148 | chn, GNUNET_ntohll (mmsg->message_id), frag_offset, | 1141 | chn, GNUNET_ntohll (mmsg->message_id), |
1149 | fragq->header_size); | 1142 | frag_offset, fragq->header_size); |
1150 | } | 1143 | } |
1151 | } | 1144 | } |
1152 | 1145 | ||
@@ -1159,8 +1152,8 @@ fragment_queue_insert (struct Channel *chn, | |||
1159 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1152 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1160 | "%p Message %" PRIu64 " is NOT complete yet: " | 1153 | "%p Message %" PRIu64 " is NOT complete yet: " |
1161 | "%" PRIu64 " != %" PRIu64 "\n", | 1154 | "%" PRIu64 " != %" PRIu64 "\n", |
1162 | chn, GNUNET_ntohll (mmsg->message_id), frag_offset, | 1155 | chn, GNUNET_ntohll (mmsg->message_id), |
1163 | fragq->size); | 1156 | frag_offset, fragq->size); |
1164 | break; | 1157 | break; |
1165 | 1158 | ||
1166 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | 1159 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: |
@@ -1486,17 +1479,26 @@ mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg | |||
1486 | uint16_t size = ntohs (mmsg->header.size); | 1479 | uint16_t size = ntohs (mmsg->header.size); |
1487 | 1480 | ||
1488 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1481 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1489 | "%p Received multicast message of size %u.\n", | 1482 | "%p Received multicast message of size %u. " |
1490 | chn, size); | 1483 | "fragment_id=%" PRIu64 ", message_id=%" PRIu64 |
1484 | ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n", | ||
1485 | chn, size, | ||
1486 | GNUNET_ntohll (mmsg->fragment_id), | ||
1487 | GNUNET_ntohll (mmsg->message_id), | ||
1488 | GNUNET_ntohll (mmsg->fragment_offset), | ||
1489 | GNUNET_ntohll (mmsg->flags)); | ||
1491 | 1490 | ||
1492 | GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, | 1491 | GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, |
1493 | &store_recv_fragment_store_result, chn); | 1492 | &store_recv_fragment_store_result, chn); |
1494 | 1493 | ||
1495 | uint16_t first_ptype = 0, last_ptype = 0; | 1494 | uint16_t first_ptype = 0, last_ptype = 0; |
1496 | if (GNUNET_SYSERR | 1495 | int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), |
1497 | == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), | 1496 | (const char *) &mmsg[1], |
1498 | (const char *) &mmsg[1], | 1497 | &first_ptype, &last_ptype); |
1499 | &first_ptype, &last_ptype)) | 1498 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1499 | "%p Message check result %d, first part type %u, last part type %u\n", | ||
1500 | chn, check, first_ptype, last_ptype); | ||
1501 | if (GNUNET_SYSERR == check) | ||
1500 | { | 1502 | { |
1501 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1503 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1502 | "%p Dropping incoming multicast message with invalid parts.\n", | 1504 | "%p Dropping incoming multicast message with invalid parts.\n", |
@@ -1505,10 +1507,6 @@ mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg | |||
1505 | return; | 1507 | return; |
1506 | } | 1508 | } |
1507 | 1509 | ||
1508 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1509 | "Message parts: first: type %u, last: type %u\n", | ||
1510 | first_ptype, last_ptype); | ||
1511 | |||
1512 | fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); | 1510 | fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); |
1513 | message_queue_run (chn); | 1511 | message_queue_run (chn); |
1514 | } | 1512 | } |
@@ -1965,6 +1963,8 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
1965 | { | 1963 | { |
1966 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1964 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1967 | "%p transmit_notify: nothing to send.\n", chn); | 1965 | "%p transmit_notify: nothing to send.\n", chn); |
1966 | if (NULL != tmit_msg && *data_size < tmit_msg->size) | ||
1967 | GNUNET_break (0); | ||
1968 | *data_size = 0; | 1968 | *data_size = 0; |
1969 | return GNUNET_NO; | 1969 | return GNUNET_NO; |
1970 | } | 1970 | } |
@@ -1975,9 +1975,13 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
1975 | *data_size = tmit_msg->size; | 1975 | *data_size = tmit_msg->size; |
1976 | memcpy (data, &tmit_msg[1], *data_size); | 1976 | memcpy (data, &tmit_msg[1], *data_size); |
1977 | 1977 | ||
1978 | int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES; | 1978 | int ret |
1979 | = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) | ||
1980 | ? GNUNET_NO | ||
1981 | : GNUNET_YES; | ||
1979 | 1982 | ||
1980 | if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent) | 1983 | /* FIXME: handle disconnecting clients */ |
1984 | if (NULL != tmit_msg->client) | ||
1981 | send_message_ack (chn, tmit_msg->client); | 1985 | send_message_ack (chn, tmit_msg->client); |
1982 | 1986 | ||
1983 | GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); | 1987 | GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); |
@@ -1985,7 +1989,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
1985 | 1989 | ||
1986 | if (NULL != chn->tmit_head) | 1990 | if (NULL != chn->tmit_head) |
1987 | { | 1991 | { |
1988 | transmit_message (chn); | 1992 | GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn); |
1989 | } | 1993 | } |
1990 | else if (GNUNET_YES == chn->is_disconnected | 1994 | else if (GNUNET_YES == chn->is_disconnected |
1991 | && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) | 1995 | && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) |
@@ -2037,10 +2041,12 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data) | |||
2037 | static void | 2041 | static void |
2038 | master_transmit_message (struct Master *mst) | 2042 | master_transmit_message (struct Master *mst) |
2039 | { | 2043 | { |
2044 | if (NULL == mst->chn.tmit_head) | ||
2045 | return; | ||
2040 | if (NULL == mst->tmit_handle) | 2046 | if (NULL == mst->tmit_handle) |
2041 | { | 2047 | { |
2042 | mst->tmit_handle | 2048 | mst->tmit_handle |
2043 | = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, | 2049 | = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->chn.tmit_head->id, |
2044 | mst->max_group_generation, | 2050 | mst->max_group_generation, |
2045 | master_transmit_notify, mst); | 2051 | master_transmit_notify, mst); |
2046 | } | 2052 | } |
@@ -2057,10 +2063,12 @@ master_transmit_message (struct Master *mst) | |||
2057 | static void | 2063 | static void |
2058 | slave_transmit_message (struct Slave *slv) | 2064 | slave_transmit_message (struct Slave *slv) |
2059 | { | 2065 | { |
2066 | if (NULL == slv->chn.tmit_head) | ||
2067 | return; | ||
2060 | if (NULL == slv->tmit_handle) | 2068 | if (NULL == slv->tmit_handle) |
2061 | { | 2069 | { |
2062 | slv->tmit_handle | 2070 | slv->tmit_handle |
2063 | = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, | 2071 | = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id, |
2064 | slave_transmit_notify, slv); | 2072 | slave_transmit_notify, slv); |
2065 | } | 2073 | } |
2066 | else | 2074 | else |
@@ -2090,6 +2098,9 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) | |||
2090 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) | 2098 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) |
2091 | { | 2099 | { |
2092 | tmit_msg->id = ++mst->max_message_id; | 2100 | tmit_msg->id = ++mst->max_message_id; |
2101 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
2102 | "%p master_queue_message: message_id=%" PRIu64 "\n", | ||
2103 | mst, tmit_msg->id); | ||
2093 | struct GNUNET_PSYC_MessageMethod *pmeth | 2104 | struct GNUNET_PSYC_MessageMethod *pmeth |
2094 | = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; | 2105 | = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; |
2095 | 2106 | ||
@@ -2159,7 +2170,6 @@ queue_message (struct Channel *chn, | |||
2159 | memcpy (&tmit_msg[1], data, data_size); | 2170 | memcpy (&tmit_msg[1], data, data_size); |
2160 | tmit_msg->client = client; | 2171 | tmit_msg->client = client; |
2161 | tmit_msg->size = data_size; | 2172 | tmit_msg->size = data_size; |
2162 | tmit_msg->state = chn->tmit_state; | ||
2163 | tmit_msg->first_ptype = first_ptype; | 2173 | tmit_msg->first_ptype = first_ptype; |
2164 | tmit_msg->last_ptype = last_ptype; | 2174 | tmit_msg->last_ptype = last_ptype; |
2165 | 2175 | ||
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index e2e6cfc87..1ce9074d5 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -225,7 +225,8 @@ master_message_part_cb (void *cls, | |||
225 | if (NULL == msg) | 225 | if (NULL == msg) |
226 | { | 226 | { |
227 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 227 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
228 | "Error while receiving message %" PRIu64 "\n", message_id); | 228 | "Test #%d: Error while master is receiving part of message #%" PRIu64 ".\n", |
229 | test, message_id); | ||
229 | return; | 230 | return; |
230 | } | 231 | } |
231 | 232 | ||
@@ -243,7 +244,8 @@ master_message_part_cb (void *cls, | |||
243 | if (GNUNET_PSYC_MESSAGE_REQUEST != flags) | 244 | if (GNUNET_PSYC_MESSAGE_REQUEST != flags) |
244 | { | 245 | { |
245 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 246 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
246 | "Unexpected request flags: %x" PRIu32 "\n", flags); | 247 | "Test #%d: Unexpected request flags: %x" PRIu32 "\n", |
248 | test, flags); | ||
247 | GNUNET_assert (0); | 249 | GNUNET_assert (0); |
248 | return; | 250 | return; |
249 | } | 251 | } |
@@ -297,7 +299,8 @@ slave_message_part_cb (void *cls, | |||
297 | if (NULL == msg) | 299 | if (NULL == msg) |
298 | { | 300 | { |
299 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 301 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
300 | "Error while receiving message " PRIu64 "\n", message_id); | 302 | "Test #%d: Error while slave is receiving part of message #%" PRIu64 ".\n", |
303 | test, message_id); | ||
301 | return; | 304 | return; |
302 | } | 305 | } |
303 | 306 | ||
@@ -322,7 +325,7 @@ slave_message_part_cb (void *cls, | |||
322 | { | 325 | { |
323 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 326 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
324 | "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", | 327 | "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", |
325 | flags); | 328 | test, flags); |
326 | GNUNET_assert (0); | 329 | GNUNET_assert (0); |
327 | return; | 330 | return; |
328 | } | 331 | } |
@@ -575,9 +578,9 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data) | |||
575 | 578 | ||
576 | uint16_t size = strlen (tmit->data[tmit->n]); | 579 | uint16_t size = strlen (tmit->data[tmit->n]); |
577 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 580 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
578 | "Transmit notify data: %u bytes available, " | 581 | "Test #%d: Transmit notify data: %u bytes available, " |
579 | "processing fragment %u/%u (size %u).\n", | 582 | "processing fragment %u/%u (size %u).\n", |
580 | *data_size, tmit->n + 1, tmit->data_count, size); | 583 | test, *data_size, tmit->n + 1, tmit->data_count, size); |
581 | if (*data_size < size) | 584 | if (*data_size < size) |
582 | { | 585 | { |
583 | *data_size = 0; | 586 | *data_size = 0; |
@@ -587,7 +590,8 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data) | |||
587 | 590 | ||
588 | if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) | 591 | if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) |
589 | { | 592 | { |
590 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); | 593 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
594 | "Test #%d: Transmission paused.\n", test); | ||
591 | tmit->paused = GNUNET_YES; | 595 | tmit->paused = GNUNET_YES; |
592 | GNUNET_SCHEDULER_add_delayed ( | 596 | GNUNET_SCHEDULER_add_delayed ( |
593 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, | 597 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
@@ -611,9 +615,9 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
611 | { | 615 | { |
612 | struct TransmitClosure *tmit = cls; | 616 | struct TransmitClosure *tmit = cls; |
613 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 617 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
614 | "Transmit notify modifier: %lu bytes available, " | 618 | "Test #%d: Transmit notify modifier: %lu bytes available, " |
615 | "%u modifiers left to process.\n", | 619 | "%u modifiers left to process.\n", |
616 | *data_size, GNUNET_ENV_environment_get_count (tmit->env)); | 620 | test, *data_size, GNUNET_ENV_environment_get_count (tmit->env)); |
617 | 621 | ||
618 | uint16_t name_size = 0; | 622 | uint16_t name_size = 0; |
619 | size_t value_size = 0; | 623 | size_t value_size = 0; |
@@ -688,9 +692,9 @@ slave_join (); | |||
688 | void | 692 | void |
689 | slave_transmit () | 693 | slave_transmit () |
690 | { | 694 | { |
691 | |||
692 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n"); | ||
693 | test = TEST_SLAVE_TRANSMIT; | 695 | test = TEST_SLAVE_TRANSMIT; |
696 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
697 | "Test #%d: Slave sending request to master.\n", test); | ||
694 | 698 | ||
695 | tmit = GNUNET_new (struct TransmitClosure); | 699 | tmit = GNUNET_new (struct TransmitClosure); |
696 | tmit->env = GNUNET_ENV_environment_create (); | 700 | tmit->env = GNUNET_ENV_environment_create (); |
@@ -772,7 +776,7 @@ join_decision_cb (void *cls, | |||
772 | const struct GNUNET_PSYC_Message *join_msg) | 776 | const struct GNUNET_PSYC_Message *join_msg) |
773 | { | 777 | { |
774 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 778 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
775 | "Slave got join decision: %d\n", is_admitted); | 779 | "Test #%d: Slave got join decision: %d\n", test, is_admitted); |
776 | 780 | ||
777 | switch (test) | 781 | switch (test) |
778 | { | 782 | { |
@@ -804,8 +808,8 @@ join_request_cb (void *cls, | |||
804 | struct GNUNET_HashCode slave_key_hash; | 808 | struct GNUNET_HashCode slave_key_hash; |
805 | GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); | 809 | GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); |
806 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 810 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
807 | "Got join request #%u from %s.\n", | 811 | "Test #%d: Got join request #%u from %s.\n", |
808 | join_req_count, GNUNET_h2s (&slave_key_hash)); | 812 | test, join_req_count, GNUNET_h2s (&slave_key_hash)); |
809 | 813 | ||
810 | /* Reject first request */ | 814 | /* Reject first request */ |
811 | int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; | 815 | int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; |
@@ -817,8 +821,8 @@ static void | |||
817 | slave_connect_cb (void *cls, int result, uint64_t max_message_id) | 821 | slave_connect_cb (void *cls, int result, uint64_t max_message_id) |
818 | { | 822 | { |
819 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 823 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
820 | "Slave connected: %d, max_message_id: %" PRIu64 "\n", | 824 | "Test #%d: Slave connected: %d, max_message_id: %" PRIu64 "\n", |
821 | result, max_message_id); | 825 | test, result, max_message_id); |
822 | GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test); | 826 | GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test); |
823 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); | 827 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); |
824 | } | 828 | } |
@@ -827,8 +831,8 @@ slave_connect_cb (void *cls, int result, uint64_t max_message_id) | |||
827 | static void | 831 | static void |
828 | slave_join (int t) | 832 | slave_join (int t) |
829 | { | 833 | { |
830 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); | ||
831 | test = t; | 834 | test = t; |
835 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Joining slave.\n"); | ||
832 | 836 | ||
833 | struct GNUNET_PeerIdentity origin = this_peer; | 837 | struct GNUNET_PeerIdentity origin = this_peer; |
834 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | 838 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); |
@@ -852,8 +856,9 @@ slave_join (int t) | |||
852 | void | 856 | void |
853 | master_transmit () | 857 | master_transmit () |
854 | { | 858 | { |
855 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); | ||
856 | test = TEST_MASTER_TRANSMIT; | 859 | test = TEST_MASTER_TRANSMIT; |
860 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
861 | "Test #%d: Master sending message to all.\n", test); | ||
857 | end_count = 0; | 862 | end_count = 0; |
858 | 863 | ||
859 | uint32_t i, j; | 864 | uint32_t i, j; |
@@ -907,8 +912,8 @@ void | |||
907 | master_start_cb (void *cls, int result, uint64_t max_message_id) | 912 | master_start_cb (void *cls, int result, uint64_t max_message_id) |
908 | { | 913 | { |
909 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 914 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
910 | "Master started: %d, max_message_id: %" PRIu64 "\n", | 915 | "Test #%d: Master started: %d, max_message_id: %" PRIu64 "\n", |
911 | result, max_message_id); | 916 | test, result, max_message_id); |
912 | GNUNET_assert (TEST_MASTER_START == test); | 917 | GNUNET_assert (TEST_MASTER_START == test); |
913 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); | 918 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); |
914 | slave_join (TEST_SLAVE_JOIN_REJECT); | 919 | slave_join (TEST_SLAVE_JOIN_REJECT); |
@@ -918,8 +923,8 @@ master_start_cb (void *cls, int result, uint64_t max_message_id) | |||
918 | void | 923 | void |
919 | master_start () | 924 | master_start () |
920 | { | 925 | { |
921 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); | ||
922 | test = TEST_MASTER_START; | 926 | test = TEST_MASTER_START; |
927 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Starting master.\n", test); | ||
923 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, | 928 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, |
924 | &master_start_cb, &join_request_cb, | 929 | &master_start_cb, &join_request_cb, |
925 | &master_message_cb, &master_message_part_cb, | 930 | &master_message_cb, &master_message_part_cb, |
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c index 80e84f29c..9e6b1b770 100644 --- a/src/psycstore/psyc_util_lib.c +++ b/src/psycstore/psyc_util_lib.c | |||
@@ -101,6 +101,12 @@ struct GNUNET_PSYC_TransmitHandle | |||
101 | * Are we currently transmitting a message? | 101 | * Are we currently transmitting a message? |
102 | */ | 102 | */ |
103 | uint8_t in_transmit; | 103 | uint8_t in_transmit; |
104 | |||
105 | /** | ||
106 | * Notify callback is currently being called. | ||
107 | */ | ||
108 | uint8_t in_notify; | ||
109 | |||
104 | }; | 110 | }; |
105 | 111 | ||
106 | 112 | ||
@@ -334,20 +340,20 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
334 | * Transmission handle. | 340 | * Transmission handle. |
335 | * @param msg | 341 | * @param msg |
336 | * Message part, or NULL. | 342 | * Message part, or NULL. |
337 | * @param end | 343 | * @param tmit_now |
338 | * End of message? | 344 | * Transmit message now, or wait for buffer to fill up? |
339 | * #GNUNET_YES or #GNUNET_NO. | 345 | * #GNUNET_YES or #GNUNET_NO. |
340 | */ | 346 | */ |
341 | static void | 347 | static void |
342 | transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | 348 | transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, |
343 | const struct GNUNET_MessageHeader *msg, | 349 | const struct GNUNET_MessageHeader *msg, |
344 | uint8_t end) | 350 | uint8_t tmit_now) |
345 | { | 351 | { |
346 | uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; | 352 | uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; |
347 | 353 | ||
348 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 354 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
349 | "Queueing message part of type %u and size %u (end: %u)).\n", | 355 | "Queueing message part of type %u and size %u (tmit_now: %u)).\n", |
350 | NULL != msg ? ntohs (msg->type) : 0, size, end); | 356 | NULL != msg ? ntohs (msg->type) : 0, size, tmit_now); |
351 | 357 | ||
352 | if (NULL != tmit->msg) | 358 | if (NULL != tmit->msg) |
353 | { | 359 | { |
@@ -380,7 +386,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
380 | } | 386 | } |
381 | 387 | ||
382 | if (NULL != tmit->msg | 388 | if (NULL != tmit->msg |
383 | && (GNUNET_YES == end | 389 | && (GNUNET_YES == tmit_now |
384 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | 390 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD |
385 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) | 391 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) |
386 | { | 392 | { |
@@ -391,9 +397,6 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
391 | tmit->msg = NULL; | 397 | tmit->msg = NULL; |
392 | tmit->acks_pending++; | 398 | tmit->acks_pending++; |
393 | } | 399 | } |
394 | |||
395 | if (GNUNET_YES == end) | ||
396 | tmit->in_transmit = GNUNET_NO; | ||
397 | } | 400 | } |
398 | 401 | ||
399 | 402 | ||
@@ -414,7 +417,9 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
414 | if (NULL != tmit->notify_data) | 417 | if (NULL != tmit->notify_data) |
415 | { | 418 | { |
416 | data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | 419 | data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; |
420 | tmit->in_notify = GNUNET_YES; | ||
417 | notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); | 421 | notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); |
422 | tmit->in_notify = GNUNET_NO; | ||
418 | } | 423 | } |
419 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 424 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
420 | "transmit_data (ret: %d, size: %u): %.*s\n", | 425 | "transmit_data (ret: %d, size: %u): %.*s\n", |
@@ -442,6 +447,7 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
442 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 447 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
443 | msg->size = htons (sizeof (*msg)); | 448 | msg->size = htons (sizeof (*msg)); |
444 | transmit_queue_insert (tmit, msg, GNUNET_YES); | 449 | transmit_queue_insert (tmit, msg, GNUNET_YES); |
450 | tmit->in_transmit = GNUNET_NO; | ||
445 | return; | 451 | return; |
446 | } | 452 | } |
447 | 453 | ||
@@ -458,6 +464,8 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
458 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | 464 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); |
459 | msg->size = htons (sizeof (*msg)); | 465 | msg->size = htons (sizeof (*msg)); |
460 | transmit_queue_insert (tmit, msg, GNUNET_YES); | 466 | transmit_queue_insert (tmit, msg, GNUNET_YES); |
467 | /* FIXME: wait for ACK before setting in_transmit to no */ | ||
468 | tmit->in_transmit = GNUNET_NO; | ||
461 | } | 469 | } |
462 | } | 470 | } |
463 | 471 | ||
@@ -489,8 +497,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
489 | { | 497 | { |
490 | max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | 498 | max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; |
491 | data_size = max_data_size; | 499 | data_size = max_data_size; |
500 | tmit->in_notify = GNUNET_YES; | ||
492 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], | 501 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], |
493 | &mod->oper, &mod->value_size); | 502 | &mod->oper, &mod->value_size); |
503 | tmit->in_notify = GNUNET_NO; | ||
494 | } | 504 | } |
495 | 505 | ||
496 | mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; | 506 | mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; |
@@ -520,8 +530,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
520 | { | 530 | { |
521 | max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | 531 | max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; |
522 | data_size = max_data_size; | 532 | data_size = max_data_size; |
533 | tmit->in_notify = GNUNET_YES; | ||
523 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, | 534 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, |
524 | &data_size, &msg[1], NULL, NULL); | 535 | &data_size, &msg[1], NULL, NULL); |
536 | tmit->in_notify = GNUNET_NO; | ||
525 | } | 537 | } |
526 | tmit->mod_value_remaining -= data_size; | 538 | tmit->mod_value_remaining -= data_size; |
527 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 539 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -558,8 +570,8 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
558 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; | 570 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; |
559 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 571 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
560 | msg->size = htons (sizeof (*msg)); | 572 | msg->size = htons (sizeof (*msg)); |
561 | |||
562 | transmit_queue_insert (tmit, msg, GNUNET_YES); | 573 | transmit_queue_insert (tmit, msg, GNUNET_YES); |
574 | tmit->in_transmit = GNUNET_NO; | ||
563 | return; | 575 | return; |
564 | } | 576 | } |
565 | 577 | ||
@@ -748,6 +760,9 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
748 | void | 760 | void |
749 | GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit) | 761 | GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit) |
750 | { | 762 | { |
763 | if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify) | ||
764 | return; | ||
765 | |||
751 | if (0 == tmit->acks_pending) | 766 | if (0 == tmit->acks_pending) |
752 | { | 767 | { |
753 | tmit->paused = GNUNET_NO; | 768 | tmit->paused = GNUNET_NO; |
@@ -800,13 +815,11 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
800 | { | 815 | { |
801 | case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: | 816 | case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: |
802 | case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: | 817 | case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: |
803 | if (GNUNET_NO == tmit->paused) | 818 | transmit_mod (tmit); |
804 | transmit_mod (tmit); | ||
805 | break; | 819 | break; |
806 | 820 | ||
807 | case GNUNET_PSYC_MESSAGE_STATE_DATA: | 821 | case GNUNET_PSYC_MESSAGE_STATE_DATA: |
808 | if (GNUNET_NO == tmit->paused) | 822 | transmit_data (tmit); |
809 | transmit_data (tmit); | ||
810 | break; | 823 | break; |
811 | 824 | ||
812 | case GNUNET_PSYC_MESSAGE_STATE_END: | 825 | case GNUNET_PSYC_MESSAGE_STATE_END: |
diff --git a/src/social/social_api.c b/src/social/social_api.c index 17e5a3bfc..4aed3755e 100644 --- a/src/social/social_api.c +++ b/src/social/social_api.c | |||
@@ -1837,8 +1837,10 @@ GNUNET_SOCIAL_host_announce (struct GNUNET_SOCIAL_Host *hst, | |||
1837 | { | 1837 | { |
1838 | if (GNUNET_OK == | 1838 | if (GNUNET_OK == |
1839 | GNUNET_PSYC_transmit_message (hst->plc.tmit, method_name, env, | 1839 | GNUNET_PSYC_transmit_message (hst->plc.tmit, method_name, env, |
1840 | NULL, notify_data, notify_data_cls, flags)); | 1840 | NULL, notify_data, notify_data_cls, flags)) |
1841 | return (struct GNUNET_SOCIAL_Announcement *) hst->plc.tmit; | 1841 | return (struct GNUNET_SOCIAL_Announcement *) hst->plc.tmit; |
1842 | else | ||
1843 | return NULL; | ||
1842 | } | 1844 | } |
1843 | 1845 | ||
1844 | 1846 | ||
@@ -2168,8 +2170,10 @@ GNUNET_SOCIAL_guest_talk (struct GNUNET_SOCIAL_Guest *gst, | |||
2168 | 2170 | ||
2169 | if (GNUNET_OK == | 2171 | if (GNUNET_OK == |
2170 | GNUNET_PSYC_transmit_message (plc->tmit, method_name, env, | 2172 | GNUNET_PSYC_transmit_message (plc->tmit, method_name, env, |
2171 | NULL, notify_data, notify_data_cls, flags)); | 2173 | NULL, notify_data, notify_data_cls, flags)) |
2172 | return (struct GNUNET_SOCIAL_TalkRequest *) plc->tmit; | 2174 | return (struct GNUNET_SOCIAL_TalkRequest *) plc->tmit; |
2175 | else | ||
2176 | return NULL; | ||
2173 | } | 2177 | } |
2174 | 2178 | ||
2175 | 2179 | ||
diff --git a/src/util/client_manager.c b/src/util/client_manager.c index 2fd52705e..80ba43ff1 100644 --- a/src/util/client_manager.c +++ b/src/util/client_manager.c | |||
@@ -328,7 +328,7 @@ transmit_next (struct GNUNET_CLIENT_MANAGER_Connection *mgr) | |||
328 | 328 | ||
329 | mgr->client_tmit | 329 | mgr->client_tmit |
330 | = GNUNET_CLIENT_notify_transmit_ready (mgr->client, | 330 | = GNUNET_CLIENT_notify_transmit_ready (mgr->client, |
331 | ntohs (mgr->tmit_head->msg->size), | 331 | GNUNET_SERVER_MAX_MESSAGE_SIZE - 1, |
332 | GNUNET_TIME_UNIT_FOREVER_REL, | 332 | GNUNET_TIME_UNIT_FOREVER_REL, |
333 | GNUNET_NO, | 333 | GNUNET_NO, |
334 | &send_next_message, | 334 | &send_next_message, |