diff options
author | Gabor X Toth <*@tg-x.net> | 2015-09-26 17:10:10 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-09-26 17:10:10 +0000 |
commit | f5a9bac85b73508da6794e1171b857e2582c8216 (patch) | |
tree | d754f5e560f85e80b2dca3ea73ff76ff2a1b8de4 /src/psyc | |
parent | e990039b4ddf89d37a8771dc5aca791e52df45ed (diff) | |
download | gnunet-f5a9bac85b73508da6794e1171b857e2582c8216.tar.gz gnunet-f5a9bac85b73508da6794e1171b857e2582c8216.zip |
psyc fixes
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 54 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 2 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 95 |
3 files changed, 103 insertions, 48 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 3671a7d6f..36e3a7764 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -98,6 +98,16 @@ struct TransmitMessage | |||
98 | uint16_t size; | 98 | uint16_t size; |
99 | 99 | ||
100 | /** | 100 | /** |
101 | * Type of first message part. | ||
102 | */ | ||
103 | uint16_t first_ptype; | ||
104 | |||
105 | /** | ||
106 | * Type of last message part. | ||
107 | */ | ||
108 | uint16_t last_ptype; | ||
109 | |||
110 | /** | ||
101 | * @see enum MessageState | 111 | * @see enum MessageState |
102 | */ | 112 | */ |
103 | uint8_t state; | 113 | uint8_t state; |
@@ -483,7 +493,7 @@ cleanup_master (struct Master *mst) | |||
483 | if (NULL != mst->origin) | 493 | if (NULL != mst->origin) |
484 | GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME | 494 | GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME |
485 | GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); | 495 | GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); |
486 | GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); | 496 | GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst); |
487 | } | 497 | } |
488 | 498 | ||
489 | 499 | ||
@@ -523,7 +533,7 @@ cleanup_slave (struct Slave *slv) | |||
523 | GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME | 533 | GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME |
524 | slv->member = NULL; | 534 | slv->member = NULL; |
525 | } | 535 | } |
526 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); | 536 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); |
527 | } | 537 | } |
528 | 538 | ||
529 | 539 | ||
@@ -582,8 +592,6 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
582 | chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", | 592 | chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", |
583 | GNUNET_h2s (&chn->pub_key_hash)); | 593 | GNUNET_h2s (&chn->pub_key_hash)); |
584 | 594 | ||
585 | chn->is_disconnected = GNUNET_YES; | ||
586 | |||
587 | struct Client *cli = chn->clients_head; | 595 | struct Client *cli = chn->clients_head; |
588 | while (NULL != cli) | 596 | while (NULL != cli) |
589 | { | 597 | { |
@@ -609,6 +617,11 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
609 | 617 | ||
610 | if (NULL == chn->clients_head) | 618 | if (NULL == chn->clients_head) |
611 | { /* Last client disconnected. */ | 619 | { /* Last client disconnected. */ |
620 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
621 | "%p Last client (%s) disconnected from channel %s\n", | ||
622 | chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", | ||
623 | GNUNET_h2s (&chn->pub_key_hash)); | ||
624 | chn->is_disconnected = GNUNET_YES; | ||
612 | if (NULL != chn->tmit_head) | 625 | if (NULL != chn->tmit_head) |
613 | { /* Send pending messages to multicast before cleanup. */ | 626 | { /* Send pending messages to multicast before cleanup. */ |
614 | transmit_message (chn); | 627 | transmit_message (chn); |
@@ -789,6 +802,11 @@ mcast_recv_join_decision (void *cls, int is_admitted, | |||
789 | struct Channel *chn = &slv->chn; | 802 | struct Channel *chn = &slv->chn; |
790 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 803 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
791 | "%p Got join decision: %d\n", slv, is_admitted); | 804 | "%p Got join decision: %d\n", slv, is_admitted); |
805 | if (GNUNET_YES == chn->is_ready) | ||
806 | { | ||
807 | /* Already admitted */ | ||
808 | return; | ||
809 | } | ||
792 | 810 | ||
793 | uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; | 811 | uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; |
794 | struct GNUNET_PSYC_JoinDecisionMessage * | 812 | struct GNUNET_PSYC_JoinDecisionMessage * |
@@ -805,10 +823,6 @@ mcast_recv_join_decision (void *cls, int is_admitted, | |||
805 | { | 823 | { |
806 | chn->is_ready = GNUNET_YES; | 824 | chn->is_ready = GNUNET_YES; |
807 | } | 825 | } |
808 | else | ||
809 | { | ||
810 | slv->member = NULL; | ||
811 | } | ||
812 | } | 826 | } |
813 | 827 | ||
814 | 828 | ||
@@ -2011,7 +2025,8 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
2011 | { | 2025 | { |
2012 | transmit_message (chn); | 2026 | transmit_message (chn); |
2013 | } | 2027 | } |
2014 | else if (GNUNET_YES == chn->is_disconnected) | 2028 | else if (GNUNET_YES == chn->is_disconnected |
2029 | && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) | ||
2015 | { | 2030 | { |
2016 | /* FIXME: handle partial message (when still in_transmit) */ | 2031 | /* FIXME: handle partial message (when still in_transmit) */ |
2017 | return GNUNET_SYSERR; | 2032 | return GNUNET_SYSERR; |
@@ -2106,12 +2121,11 @@ transmit_message (struct Channel *chn) | |||
2106 | * Queue a message from a channel master for sending to the multicast group. | 2121 | * Queue a message from a channel master for sending to the multicast group. |
2107 | */ | 2122 | */ |
2108 | static void | 2123 | static void |
2109 | master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, | 2124 | master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) |
2110 | uint16_t first_ptype, uint16_t last_ptype) | ||
2111 | { | 2125 | { |
2112 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); | 2126 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); |
2113 | 2127 | ||
2114 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | 2128 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) |
2115 | { | 2129 | { |
2116 | tmit_msg->id = ++mst->max_message_id; | 2130 | tmit_msg->id = ++mst->max_message_id; |
2117 | struct GNUNET_PSYC_MessageMethod *pmeth | 2131 | struct GNUNET_PSYC_MessageMethod *pmeth |
@@ -2149,10 +2163,9 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, | |||
2149 | * Queue a message from a channel slave for sending to the multicast group. | 2163 | * Queue a message from a channel slave for sending to the multicast group. |
2150 | */ | 2164 | */ |
2151 | static void | 2165 | static void |
2152 | slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, | 2166 | slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg) |
2153 | uint16_t first_ptype, uint16_t last_ptype) | ||
2154 | { | 2167 | { |
2155 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | 2168 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) |
2156 | { | 2169 | { |
2157 | struct GNUNET_PSYC_MessageMethod *pmeth | 2170 | struct GNUNET_PSYC_MessageMethod *pmeth |
2158 | = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; | 2171 | = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; |
@@ -2185,16 +2198,16 @@ queue_message (struct Channel *chn, | |||
2185 | tmit_msg->client = client; | 2198 | tmit_msg->client = client; |
2186 | tmit_msg->size = data_size; | 2199 | tmit_msg->size = data_size; |
2187 | tmit_msg->state = chn->tmit_state; | 2200 | tmit_msg->state = chn->tmit_state; |
2201 | tmit_msg->first_ptype = first_ptype; | ||
2202 | tmit_msg->last_ptype = last_ptype; | ||
2188 | 2203 | ||
2189 | /* FIXME: separate queue per message ID */ | 2204 | /* FIXME: separate queue per message ID */ |
2190 | 2205 | ||
2191 | GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); | 2206 | GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); |
2192 | 2207 | ||
2193 | chn->is_master | 2208 | chn->is_master |
2194 | ? master_queue_message ((struct Master *) chn, tmit_msg, | 2209 | ? master_queue_message ((struct Master *) chn, tmit_msg) |
2195 | first_ptype, last_ptype) | 2210 | : slave_queue_message ((struct Slave *) chn, tmit_msg); |
2196 | : slave_queue_message ((struct Slave *) chn, tmit_msg, | ||
2197 | first_ptype, last_ptype); | ||
2198 | return tmit_msg; | 2211 | return tmit_msg; |
2199 | } | 2212 | } |
2200 | 2213 | ||
@@ -2295,7 +2308,8 @@ store_recv_membership_store_result (void *cls, int64_t result, | |||
2295 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", | 2308 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", |
2296 | op->chn, result, err_msg_size, err_msg); | 2309 | op->chn, result, err_msg_size, err_msg); |
2297 | 2310 | ||
2298 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); | 2311 | if (NULL != op->client) |
2312 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); | ||
2299 | op_remove (op); | 2313 | op_remove (op); |
2300 | } | 2314 | } |
2301 | 2315 | ||
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 6128e4d82..9392deeef 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -464,7 +464,7 @@ master_recv_join_request (void *cls, | |||
464 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) | 464 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) |
465 | { | 465 | { |
466 | join_msg = (struct GNUNET_PSYC_Message *) &req[1]; | 466 | join_msg = (struct GNUNET_PSYC_Message *) &req[1]; |
467 | LOG (GNUNET_ERROR_TYPE_ERROR, | 467 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
468 | "Received join_msg of type %u and size %u.\n", | 468 | "Received join_msg of type %u and size %u.\n", |
469 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); | 469 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); |
470 | } | 470 | } |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 863eaab88..e2e6cfc87 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -86,19 +86,22 @@ uint8_t join_req_count, end_count; | |||
86 | 86 | ||
87 | enum | 87 | enum |
88 | { | 88 | { |
89 | TEST_NONE = 0, | 89 | TEST_NONE = 0, |
90 | TEST_MASTER_START = 1, | 90 | TEST_MASTER_START = 1, |
91 | TEST_SLAVE_JOIN = 2, | 91 | TEST_SLAVE_JOIN_REJECT = 2, |
92 | TEST_SLAVE_TRANSMIT = 3, | 92 | TEST_SLAVE_JOIN_ACCEPT = 3, |
93 | TEST_MASTER_TRANSMIT = 4, | 93 | TEST_SLAVE_ADD = 4, |
94 | TEST_MASTER_HISTORY_REPLAY_LATEST = 5, | 94 | TEST_SLAVE_REMOVE = 5, |
95 | TEST_SLAVE_HISTORY_REPLAY_LATEST = 6, | 95 | TEST_SLAVE_TRANSMIT = 6, |
96 | TEST_MASTER_HISTORY_REPLAY = 7, | 96 | TEST_MASTER_TRANSMIT = 7, |
97 | TEST_SLAVE_HISTORY_REPLAY = 8, | 97 | TEST_MASTER_HISTORY_REPLAY_LATEST = 8, |
98 | TEST_MASTER_STATE_GET = 9, | 98 | TEST_SLAVE_HISTORY_REPLAY_LATEST = 9, |
99 | TEST_SLAVE_STATE_GET = 10, | 99 | TEST_MASTER_HISTORY_REPLAY = 10, |
100 | TEST_MASTER_STATE_GET_PREFIX = 11, | 100 | TEST_SLAVE_HISTORY_REPLAY = 11, |
101 | TEST_SLAVE_STATE_GET_PREFIX = 12, | 101 | TEST_MASTER_STATE_GET = 12, |
102 | TEST_SLAVE_STATE_GET = 13, | ||
103 | TEST_MASTER_STATE_GET_PREFIX = 14, | ||
104 | TEST_SLAVE_STATE_GET_PREFIX = 15, | ||
102 | } test; | 105 | } test; |
103 | 106 | ||
104 | 107 | ||
@@ -204,6 +207,7 @@ void | |||
204 | master_message_cb (void *cls, uint64_t message_id, uint32_t flags, | 207 | master_message_cb (void *cls, uint64_t message_id, uint32_t flags, |
205 | const struct GNUNET_PSYC_MessageHeader *msg) | 208 | const struct GNUNET_PSYC_MessageHeader *msg) |
206 | { | 209 | { |
210 | GNUNET_assert (NULL != msg); | ||
207 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 211 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
208 | "Test #%d: Master got PSYC message fragment of size %u " | 212 | "Test #%d: Master got PSYC message fragment of size %u " |
209 | "belonging to message ID %" PRIu64 " with flags %x\n", | 213 | "belonging to message ID %" PRIu64 " with flags %x\n", |
@@ -718,17 +722,46 @@ slave_remove_cb (void *cls, int64_t result, | |||
718 | 722 | ||
719 | 723 | ||
720 | void | 724 | void |
725 | slave_remove () | ||
726 | { | ||
727 | test = TEST_SLAVE_REMOVE; | ||
728 | struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); | ||
729 | GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, | ||
730 | &slave_remove_cb, chn); | ||
731 | } | ||
732 | |||
733 | |||
734 | void | ||
721 | slave_add_cb (void *cls, int64_t result, | 735 | slave_add_cb (void *cls, int64_t result, |
722 | const void *err_msg, uint16_t err_msg_size) | 736 | const void *err_msg, uint16_t err_msg_size) |
723 | { | 737 | { |
724 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 738 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
725 | "slave_add:\t%" PRId64 " (%.*s)\n", | 739 | "slave_add:\t%" PRId64 " (%.*s)\n", |
726 | result, err_msg_size, err_msg); | 740 | result, err_msg_size, err_msg); |
741 | slave_remove (); | ||
742 | } | ||
727 | 743 | ||
728 | struct GNUNET_PSYC_Channel *chn = cls; | ||
729 | GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, | ||
730 | &slave_remove_cb, chn); | ||
731 | 744 | ||
745 | void | ||
746 | slave_add () | ||
747 | { | ||
748 | test = TEST_SLAVE_ADD; | ||
749 | struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); | ||
750 | GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn); | ||
751 | } | ||
752 | |||
753 | |||
754 | void first_slave_parted (void *cls) | ||
755 | { | ||
756 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n"); | ||
757 | slave_join (TEST_SLAVE_JOIN_ACCEPT); | ||
758 | } | ||
759 | |||
760 | |||
761 | void | ||
762 | schedule_slave_part (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
763 | { | ||
764 | GNUNET_PSYC_slave_part (slv, GNUNET_NO, first_slave_parted, NULL); | ||
732 | } | 765 | } |
733 | 766 | ||
734 | 767 | ||
@@ -741,15 +774,23 @@ join_decision_cb (void *cls, | |||
741 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 774 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
742 | "Slave got join decision: %d\n", is_admitted); | 775 | "Slave got join decision: %d\n", is_admitted); |
743 | 776 | ||
744 | if (GNUNET_YES != is_admitted) | 777 | switch (test) |
745 | { /* First join request is refused, retry. */ | 778 | { |
779 | case TEST_SLAVE_JOIN_REJECT: | ||
780 | GNUNET_assert (0 == is_admitted); | ||
746 | GNUNET_assert (1 == join_req_count); | 781 | GNUNET_assert (1 == join_req_count); |
747 | slave_join (); | 782 | GNUNET_SCHEDULER_add_now (schedule_slave_part, NULL); |
748 | return; | 783 | break; |
749 | } | ||
750 | 784 | ||
751 | struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); | 785 | case TEST_SLAVE_JOIN_ACCEPT: |
752 | GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn); | 786 | GNUNET_assert (1 == is_admitted); |
787 | GNUNET_assert (2 == join_req_count); | ||
788 | slave_add (); | ||
789 | break; | ||
790 | |||
791 | default: | ||
792 | GNUNET_break (0); | ||
793 | } | ||
753 | } | 794 | } |
754 | 795 | ||
755 | 796 | ||
@@ -778,16 +819,16 @@ slave_connect_cb (void *cls, int result, uint64_t max_message_id) | |||
778 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 819 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
779 | "Slave connected: %d, max_message_id: %" PRIu64 "\n", | 820 | "Slave connected: %d, max_message_id: %" PRIu64 "\n", |
780 | result, max_message_id); | 821 | result, max_message_id); |
781 | GNUNET_assert (TEST_SLAVE_JOIN == test); | 822 | GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test); |
782 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); | 823 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); |
783 | } | 824 | } |
784 | 825 | ||
785 | 826 | ||
786 | static void | 827 | static void |
787 | slave_join () | 828 | slave_join (int t) |
788 | { | 829 | { |
789 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); | 830 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); |
790 | test = TEST_SLAVE_JOIN; | 831 | test = t; |
791 | 832 | ||
792 | struct GNUNET_PeerIdentity origin = this_peer; | 833 | struct GNUNET_PeerIdentity origin = this_peer; |
793 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | 834 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); |
@@ -870,7 +911,7 @@ master_start_cb (void *cls, int result, uint64_t max_message_id) | |||
870 | result, max_message_id); | 911 | result, max_message_id); |
871 | GNUNET_assert (TEST_MASTER_START == test); | 912 | GNUNET_assert (TEST_MASTER_START == test); |
872 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); | 913 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); |
873 | slave_join (); | 914 | slave_join (TEST_SLAVE_JOIN_REJECT); |
874 | } | 915 | } |
875 | 916 | ||
876 | 917 | ||