aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-09-26 17:10:10 +0000
committerGabor X Toth <*@tg-x.net>2015-09-26 17:10:10 +0000
commitf5a9bac85b73508da6794e1171b857e2582c8216 (patch)
treed754f5e560f85e80b2dca3ea73ff76ff2a1b8de4 /src/psyc
parente990039b4ddf89d37a8771dc5aca791e52df45ed (diff)
downloadgnunet-f5a9bac85b73508da6794e1171b857e2582c8216.tar.gz
gnunet-f5a9bac85b73508da6794e1171b857e2582c8216.zip
psyc fixes
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c54
-rw-r--r--src/psyc/psyc_api.c2
-rw-r--r--src/psyc/test_psyc.c95
3 files changed, 103 insertions, 48 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 3671a7d6f..36e3a7764 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -98,6 +98,16 @@ struct TransmitMessage
98 uint16_t size; 98 uint16_t size;
99 99
100 /** 100 /**
101 * Type of first message part.
102 */
103 uint16_t first_ptype;
104
105 /**
106 * Type of last message part.
107 */
108 uint16_t last_ptype;
109
110 /**
101 * @see enum MessageState 111 * @see enum MessageState
102 */ 112 */
103 uint8_t state; 113 uint8_t state;
@@ -483,7 +493,7 @@ cleanup_master (struct Master *mst)
483 if (NULL != mst->origin) 493 if (NULL != mst->origin)
484 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME 494 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
485 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); 495 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
486 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); 496 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
487} 497}
488 498
489 499
@@ -523,7 +533,7 @@ cleanup_slave (struct Slave *slv)
523 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME 533 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
524 slv->member = NULL; 534 slv->member = NULL;
525 } 535 }
526 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); 536 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
527} 537}
528 538
529 539
@@ -582,8 +592,6 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
582 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", 592 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
583 GNUNET_h2s (&chn->pub_key_hash)); 593 GNUNET_h2s (&chn->pub_key_hash));
584 594
585 chn->is_disconnected = GNUNET_YES;
586
587 struct Client *cli = chn->clients_head; 595 struct Client *cli = chn->clients_head;
588 while (NULL != cli) 596 while (NULL != cli)
589 { 597 {
@@ -609,6 +617,11 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
609 617
610 if (NULL == chn->clients_head) 618 if (NULL == chn->clients_head)
611 { /* Last client disconnected. */ 619 { /* Last client disconnected. */
620 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621 "%p Last client (%s) disconnected from channel %s\n",
622 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
623 GNUNET_h2s (&chn->pub_key_hash));
624 chn->is_disconnected = GNUNET_YES;
612 if (NULL != chn->tmit_head) 625 if (NULL != chn->tmit_head)
613 { /* Send pending messages to multicast before cleanup. */ 626 { /* Send pending messages to multicast before cleanup. */
614 transmit_message (chn); 627 transmit_message (chn);
@@ -789,6 +802,11 @@ mcast_recv_join_decision (void *cls, int is_admitted,
789 struct Channel *chn = &slv->chn; 802 struct Channel *chn = &slv->chn;
790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791 "%p Got join decision: %d\n", slv, is_admitted); 804 "%p Got join decision: %d\n", slv, is_admitted);
805 if (GNUNET_YES == chn->is_ready)
806 {
807 /* Already admitted */
808 return;
809 }
792 810
793 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; 811 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
794 struct GNUNET_PSYC_JoinDecisionMessage * 812 struct GNUNET_PSYC_JoinDecisionMessage *
@@ -805,10 +823,6 @@ mcast_recv_join_decision (void *cls, int is_admitted,
805 { 823 {
806 chn->is_ready = GNUNET_YES; 824 chn->is_ready = GNUNET_YES;
807 } 825 }
808 else
809 {
810 slv->member = NULL;
811 }
812} 826}
813 827
814 828
@@ -2011,7 +2025,8 @@ transmit_notify (void *cls, size_t *data_size, void *data)
2011 { 2025 {
2012 transmit_message (chn); 2026 transmit_message (chn);
2013 } 2027 }
2014 else if (GNUNET_YES == chn->is_disconnected) 2028 else if (GNUNET_YES == chn->is_disconnected
2029 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2015 { 2030 {
2016 /* FIXME: handle partial message (when still in_transmit) */ 2031 /* FIXME: handle partial message (when still in_transmit) */
2017 return GNUNET_SYSERR; 2032 return GNUNET_SYSERR;
@@ -2106,12 +2121,11 @@ transmit_message (struct Channel *chn)
2106 * Queue a message from a channel master for sending to the multicast group. 2121 * Queue a message from a channel master for sending to the multicast group.
2107 */ 2122 */
2108static void 2123static void
2109master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, 2124master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2110 uint16_t first_ptype, uint16_t last_ptype)
2111{ 2125{
2112 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); 2126 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2113 2127
2114 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) 2128 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2115 { 2129 {
2116 tmit_msg->id = ++mst->max_message_id; 2130 tmit_msg->id = ++mst->max_message_id;
2117 struct GNUNET_PSYC_MessageMethod *pmeth 2131 struct GNUNET_PSYC_MessageMethod *pmeth
@@ -2149,10 +2163,9 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2149 * Queue a message from a channel slave for sending to the multicast group. 2163 * Queue a message from a channel slave for sending to the multicast group.
2150 */ 2164 */
2151static void 2165static void
2152slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, 2166slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2153 uint16_t first_ptype, uint16_t last_ptype)
2154{ 2167{
2155 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) 2168 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2156 { 2169 {
2157 struct GNUNET_PSYC_MessageMethod *pmeth 2170 struct GNUNET_PSYC_MessageMethod *pmeth
2158 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; 2171 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
@@ -2185,16 +2198,16 @@ queue_message (struct Channel *chn,
2185 tmit_msg->client = client; 2198 tmit_msg->client = client;
2186 tmit_msg->size = data_size; 2199 tmit_msg->size = data_size;
2187 tmit_msg->state = chn->tmit_state; 2200 tmit_msg->state = chn->tmit_state;
2201 tmit_msg->first_ptype = first_ptype;
2202 tmit_msg->last_ptype = last_ptype;
2188 2203
2189 /* FIXME: separate queue per message ID */ 2204 /* FIXME: separate queue per message ID */
2190 2205
2191 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); 2206 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2192 2207
2193 chn->is_master 2208 chn->is_master
2194 ? master_queue_message ((struct Master *) chn, tmit_msg, 2209 ? master_queue_message ((struct Master *) chn, tmit_msg)
2195 first_ptype, last_ptype) 2210 : slave_queue_message ((struct Slave *) chn, tmit_msg);
2196 : slave_queue_message ((struct Slave *) chn, tmit_msg,
2197 first_ptype, last_ptype);
2198 return tmit_msg; 2211 return tmit_msg;
2199} 2212}
2200 2213
@@ -2295,7 +2308,8 @@ store_recv_membership_store_result (void *cls, int64_t result,
2295 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", 2308 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2296 op->chn, result, err_msg_size, err_msg); 2309 op->chn, result, err_msg_size, err_msg);
2297 2310
2298 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); 2311 if (NULL != op->client)
2312 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2299 op_remove (op); 2313 op_remove (op);
2300} 2314}
2301 2315
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 6128e4d82..9392deeef 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -464,7 +464,7 @@ master_recv_join_request (void *cls,
464 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) 464 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size))
465 { 465 {
466 join_msg = (struct GNUNET_PSYC_Message *) &req[1]; 466 join_msg = (struct GNUNET_PSYC_Message *) &req[1];
467 LOG (GNUNET_ERROR_TYPE_ERROR, 467 LOG (GNUNET_ERROR_TYPE_DEBUG,
468 "Received join_msg of type %u and size %u.\n", 468 "Received join_msg of type %u and size %u.\n",
469 ntohs (join_msg->header.type), ntohs (join_msg->header.size)); 469 ntohs (join_msg->header.type), ntohs (join_msg->header.size));
470 } 470 }
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 863eaab88..e2e6cfc87 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -86,19 +86,22 @@ uint8_t join_req_count, end_count;
86 86
87enum 87enum
88{ 88{
89 TEST_NONE = 0, 89 TEST_NONE = 0,
90 TEST_MASTER_START = 1, 90 TEST_MASTER_START = 1,
91 TEST_SLAVE_JOIN = 2, 91 TEST_SLAVE_JOIN_REJECT = 2,
92 TEST_SLAVE_TRANSMIT = 3, 92 TEST_SLAVE_JOIN_ACCEPT = 3,
93 TEST_MASTER_TRANSMIT = 4, 93 TEST_SLAVE_ADD = 4,
94 TEST_MASTER_HISTORY_REPLAY_LATEST = 5, 94 TEST_SLAVE_REMOVE = 5,
95 TEST_SLAVE_HISTORY_REPLAY_LATEST = 6, 95 TEST_SLAVE_TRANSMIT = 6,
96 TEST_MASTER_HISTORY_REPLAY = 7, 96 TEST_MASTER_TRANSMIT = 7,
97 TEST_SLAVE_HISTORY_REPLAY = 8, 97 TEST_MASTER_HISTORY_REPLAY_LATEST = 8,
98 TEST_MASTER_STATE_GET = 9, 98 TEST_SLAVE_HISTORY_REPLAY_LATEST = 9,
99 TEST_SLAVE_STATE_GET = 10, 99 TEST_MASTER_HISTORY_REPLAY = 10,
100 TEST_MASTER_STATE_GET_PREFIX = 11, 100 TEST_SLAVE_HISTORY_REPLAY = 11,
101 TEST_SLAVE_STATE_GET_PREFIX = 12, 101 TEST_MASTER_STATE_GET = 12,
102 TEST_SLAVE_STATE_GET = 13,
103 TEST_MASTER_STATE_GET_PREFIX = 14,
104 TEST_SLAVE_STATE_GET_PREFIX = 15,
102} test; 105} test;
103 106
104 107
@@ -204,6 +207,7 @@ void
204master_message_cb (void *cls, uint64_t message_id, uint32_t flags, 207master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
205 const struct GNUNET_PSYC_MessageHeader *msg) 208 const struct GNUNET_PSYC_MessageHeader *msg)
206{ 209{
210 GNUNET_assert (NULL != msg);
207 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 211 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
208 "Test #%d: Master got PSYC message fragment of size %u " 212 "Test #%d: Master got PSYC message fragment of size %u "
209 "belonging to message ID %" PRIu64 " with flags %x\n", 213 "belonging to message ID %" PRIu64 " with flags %x\n",
@@ -718,17 +722,46 @@ slave_remove_cb (void *cls, int64_t result,
718 722
719 723
720void 724void
725slave_remove ()
726{
727 test = TEST_SLAVE_REMOVE;
728 struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
729 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
730 &slave_remove_cb, chn);
731}
732
733
734void
721slave_add_cb (void *cls, int64_t result, 735slave_add_cb (void *cls, int64_t result,
722 const void *err_msg, uint16_t err_msg_size) 736 const void *err_msg, uint16_t err_msg_size)
723{ 737{
724 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 738 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
725 "slave_add:\t%" PRId64 " (%.*s)\n", 739 "slave_add:\t%" PRId64 " (%.*s)\n",
726 result, err_msg_size, err_msg); 740 result, err_msg_size, err_msg);
741 slave_remove ();
742}
727 743
728 struct GNUNET_PSYC_Channel *chn = cls;
729 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
730 &slave_remove_cb, chn);
731 744
745void
746slave_add ()
747{
748 test = TEST_SLAVE_ADD;
749 struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
750 GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
751}
752
753
754void first_slave_parted (void *cls)
755{
756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n");
757 slave_join (TEST_SLAVE_JOIN_ACCEPT);
758}
759
760
761void
762schedule_slave_part (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
763{
764 GNUNET_PSYC_slave_part (slv, GNUNET_NO, first_slave_parted, NULL);
732} 765}
733 766
734 767
@@ -741,15 +774,23 @@ join_decision_cb (void *cls,
741 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 774 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
742 "Slave got join decision: %d\n", is_admitted); 775 "Slave got join decision: %d\n", is_admitted);
743 776
744 if (GNUNET_YES != is_admitted) 777 switch (test)
745 { /* First join request is refused, retry. */ 778 {
779 case TEST_SLAVE_JOIN_REJECT:
780 GNUNET_assert (0 == is_admitted);
746 GNUNET_assert (1 == join_req_count); 781 GNUNET_assert (1 == join_req_count);
747 slave_join (); 782 GNUNET_SCHEDULER_add_now (schedule_slave_part, NULL);
748 return; 783 break;
749 }
750 784
751 struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); 785 case TEST_SLAVE_JOIN_ACCEPT:
752 GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn); 786 GNUNET_assert (1 == is_admitted);
787 GNUNET_assert (2 == join_req_count);
788 slave_add ();
789 break;
790
791 default:
792 GNUNET_break (0);
793 }
753} 794}
754 795
755 796
@@ -778,16 +819,16 @@ slave_connect_cb (void *cls, int result, uint64_t max_message_id)
778 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 819 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
779 "Slave connected: %d, max_message_id: %" PRIu64 "\n", 820 "Slave connected: %d, max_message_id: %" PRIu64 "\n",
780 result, max_message_id); 821 result, max_message_id);
781 GNUNET_assert (TEST_SLAVE_JOIN == test); 822 GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test);
782 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); 823 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
783} 824}
784 825
785 826
786static void 827static void
787slave_join () 828slave_join (int t)
788{ 829{
789 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); 830 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
790 test = TEST_SLAVE_JOIN; 831 test = t;
791 832
792 struct GNUNET_PeerIdentity origin = this_peer; 833 struct GNUNET_PeerIdentity origin = this_peer;
793 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); 834 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
@@ -870,7 +911,7 @@ master_start_cb (void *cls, int result, uint64_t max_message_id)
870 result, max_message_id); 911 result, max_message_id);
871 GNUNET_assert (TEST_MASTER_START == test); 912 GNUNET_assert (TEST_MASTER_START == test);
872 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); 913 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
873 slave_join (); 914 slave_join (TEST_SLAVE_JOIN_REJECT);
874} 915}
875 916
876 917