diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-03-19 16:34:45 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-03-19 16:34:45 +0000 |
commit | e2cd40becaf2dda615712d4a0577d6750ad3c8e7 (patch) | |
tree | 993909e7d9efb7ecb7c1c20fea4f3e20b9c7f797 /src/consensus | |
parent | 4d0c05606b571f31b737847d5821218f1f48e78d (diff) | |
download | gnunet-e2cd40becaf2dda615712d4a0577d6750ad3c8e7.tar.gz gnunet-e2cd40becaf2dda615712d4a0577d6750ad3c8e7.zip |
fix for for multi-peer consensus, non-power-of-two consensus now works
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 106 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 2 |
2 files changed, 86 insertions, 22 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index b1323f902..e300d9169 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -241,6 +241,11 @@ struct ConsensusPeerInformation | |||
241 | */ | 241 | */ |
242 | struct StrataMessage *premature_strata_message; | 242 | struct StrataMessage *premature_strata_message; |
243 | 243 | ||
244 | /** | ||
245 | * We have finishes the exp-subround with the peer. | ||
246 | */ | ||
247 | int exp_subround_finished; | ||
248 | |||
244 | }; | 249 | }; |
245 | 250 | ||
246 | typedef void (*QueuedMessageCallback) (void *msg); | 251 | typedef void (*QueuedMessageCallback) (void *msg); |
@@ -787,11 +792,21 @@ handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GN | |||
787 | } | 792 | } |
788 | 793 | ||
789 | static void | 794 | static void |
790 | queue_cont_subround_over (void *cls) | 795 | fin_sent_cb (void *cls) |
791 | { | 796 | { |
792 | struct ConsensusSession *session; | 797 | struct ConsensusPeerInformation *cpi; |
793 | session = cls; | 798 | int not_finished; |
794 | subround_over (session, NULL); | 799 | cpi = cls; |
800 | cpi->exp_subround_finished = GNUNET_YES; | ||
801 | /* the subround is only really over if *both* partners are done */ | ||
802 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); | ||
803 | not_finished = 0; | ||
804 | if ((cpi->session->partner_outgoing != NULL) && (cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO)) | ||
805 | not_finished++; | ||
806 | if ((cpi->session->partner_incoming != NULL) && (cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO)) | ||
807 | not_finished++; | ||
808 | if (0 == not_finished) | ||
809 | subround_over (cpi->session, NULL); | ||
795 | } | 810 | } |
796 | 811 | ||
797 | 812 | ||
@@ -806,12 +821,13 @@ handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_Mes | |||
806 | switch (cpi->session->current_round) | 821 | switch (cpi->session->current_round) |
807 | { | 822 | { |
808 | case CONSENSUS_ROUND_EXCHANGE: | 823 | case CONSENSUS_ROUND_EXCHANGE: |
824 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
809 | fin_msg = GNUNET_malloc (sizeof *fin_msg); | 825 | fin_msg = GNUNET_malloc (sizeof *fin_msg); |
810 | fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); | 826 | fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); |
811 | fin_msg->size = htons (sizeof *fin_msg); | 827 | fin_msg->size = htons (sizeof *fin_msg); |
812 | /* the subround os over once we kicked off sending the fin msg */ | 828 | /* the subround os over once we kicked off sending the fin msg */ |
813 | /* FIXME: assert we are talking to the right peer! */ | 829 | /* FIXME: assert we are talking to the right peer! */ |
814 | queue_peer_message_with_cls (cpi, fin_msg, queue_cont_subround_over, cpi->session); | 830 | queue_peer_message_with_cls (cpi, fin_msg, fin_sent_cb, cpi); |
815 | break; | 831 | break; |
816 | default: | 832 | default: |
817 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); | 833 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); |
@@ -830,8 +846,20 @@ handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_Messag | |||
830 | switch (cpi->session->current_round) | 846 | switch (cpi->session->current_round) |
831 | { | 847 | { |
832 | case CONSENSUS_ROUND_EXCHANGE: | 848 | case CONSENSUS_ROUND_EXCHANGE: |
833 | subround_over (cpi->session, NULL); | 849 | { |
834 | break; | 850 | int not_finished; |
851 | cpi->exp_subround_finished = GNUNET_YES; | ||
852 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
853 | /* the subround is only really over if *both* partners are done */ | ||
854 | not_finished = 0; | ||
855 | if ((cpi->session->partner_outgoing != NULL) && (cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO)) | ||
856 | not_finished++; | ||
857 | if ((cpi->session->partner_incoming != NULL) && (cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO)) | ||
858 | not_finished++; | ||
859 | if (0 == not_finished) | ||
860 | subround_over (cpi->session, NULL); | ||
861 | } | ||
862 | break; | ||
835 | default: | 863 | default: |
836 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); | 864 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); |
837 | break; | 865 | break; |
@@ -872,6 +900,8 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
872 | void *buf; | 900 | void *buf; |
873 | size_t size; | 901 | size_t size; |
874 | 902 | ||
903 | |||
904 | |||
875 | switch (cpi->session->current_round) | 905 | switch (cpi->session->current_round) |
876 | { | 906 | { |
877 | case CONSENSUS_ROUND_EXCHANGE: | 907 | case CONSENSUS_ROUND_EXCHANGE: |
@@ -881,7 +911,8 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
881 | { | 911 | { |
882 | if (GNUNET_NO == cpi->replaying_strata_message) | 912 | if (GNUNET_NO == cpi->replaying_strata_message) |
883 | { | 913 | { |
884 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got probably premature message\n"); | 914 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n", |
915 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround); | ||
885 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); | 916 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); |
886 | } | 917 | } |
887 | return GNUNET_YES; | 918 | return GNUNET_YES; |
@@ -905,7 +936,9 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
905 | } | 936 | } |
906 | 937 | ||
907 | diff = estimate_difference (cpi->session->se, cpi->se); | 938 | diff = estimate_difference (cpi->session->se, cpi->se); |
908 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff); | 939 | |
940 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d\n", | ||
941 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
909 | 942 | ||
910 | if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) || | 943 | if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) || |
911 | (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)) | 944 | (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)) |
@@ -988,9 +1021,9 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
988 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | 1021 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) |
989 | { | 1022 | { |
990 | cpi->ibf_state = IBF_STATE_DECODING; | 1023 | cpi->ibf_state = IBF_STATE_DECODING; |
1024 | cpi->ibf_bucket_counter = 0; | ||
991 | prepare_ibf (cpi); | 1025 | prepare_ibf (cpi); |
992 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); | 1026 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); |
993 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "about to decode\n"); | ||
994 | decode (cpi); | 1027 | decode (cpi); |
995 | } | 1028 | } |
996 | return GNUNET_YES; | 1029 | return GNUNET_YES; |
@@ -1075,7 +1108,8 @@ embrace_peer (struct ConsensusPeerInformation *cpi) | |||
1075 | case CONSENSUS_ROUND_INVENTORY: | 1108 | case CONSENSUS_ROUND_INVENTORY: |
1076 | /* fallthrough */ | 1109 | /* fallthrough */ |
1077 | case CONSENSUS_ROUND_STOCK: | 1110 | case CONSENSUS_ROUND_STOCK: |
1078 | send_strata_estimator (cpi); | 1111 | if (cpi == cpi->session->partner_outgoing) |
1112 | send_strata_estimator (cpi); | ||
1079 | default: | 1113 | default: |
1080 | break; | 1114 | break; |
1081 | } | 1115 | } |
@@ -1128,6 +1162,11 @@ send_strata_estimator (struct ConsensusPeerInformation *cpi) | |||
1128 | size_t msize; | 1162 | size_t msize; |
1129 | int i; | 1163 | int i; |
1130 | 1164 | ||
1165 | |||
1166 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE to P%d\n", | ||
1167 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1168 | |||
1169 | |||
1131 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); | 1170 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); |
1132 | 1171 | ||
1133 | strata_msg = GNUNET_malloc (msize); | 1172 | strata_msg = GNUNET_malloc (msize); |
@@ -1155,6 +1194,10 @@ static void | |||
1155 | send_ibf (struct ConsensusPeerInformation *cpi) | 1194 | send_ibf (struct ConsensusPeerInformation *cpi) |
1156 | { | 1195 | { |
1157 | int sent_buckets; | 1196 | int sent_buckets; |
1197 | |||
1198 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", | ||
1199 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1200 | |||
1158 | sent_buckets = 0; | 1201 | sent_buckets = 0; |
1159 | while (sent_buckets < (1 << cpi->ibf_order)) | 1202 | while (sent_buckets < (1 << cpi->ibf_order)) |
1160 | { | 1203 | { |
@@ -1214,7 +1257,7 @@ decode (struct ConsensusPeerInformation *cpi) | |||
1214 | if (GNUNET_NO == res) | 1257 | if (GNUNET_NO == res) |
1215 | { | 1258 | { |
1216 | struct GNUNET_MessageHeader *msg; | 1259 | struct GNUNET_MessageHeader *msg; |
1217 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n"); | 1260 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); |
1218 | msg = GNUNET_malloc (sizeof *msg); | 1261 | msg = GNUNET_malloc (sizeof *msg); |
1219 | msg->size = htons (sizeof *msg); | 1262 | msg->size = htons (sizeof *msg); |
1220 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); | 1263 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); |
@@ -1903,7 +1946,6 @@ find_partners (struct ConsensusSession *session) | |||
1903 | continue; | 1946 | continue; |
1904 | arc = (i + (1 << session->exp_subround)) % session->num_peers; | 1947 | arc = (i + (1 << session->exp_subround)) % session->num_peers; |
1905 | mark[i] = mark[arc] = 1; | 1948 | mark[i] = mark[arc] = 1; |
1906 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d talks to %d\n", i, arc); | ||
1907 | GNUNET_assert (i != arc); | 1949 | GNUNET_assert (i != arc); |
1908 | if (i == session->local_peer_idx) | 1950 | if (i == session->local_peer_idx) |
1909 | { | 1951 | { |
@@ -1915,10 +1957,6 @@ find_partners (struct ConsensusSession *session) | |||
1915 | GNUNET_assert (NULL == session->partner_incoming); | 1957 | GNUNET_assert (NULL == session->partner_incoming); |
1916 | session->partner_incoming = &session->info[session->shuffle[i]]; | 1958 | session->partner_incoming = &session->info[session->shuffle[i]]; |
1917 | } | 1959 | } |
1918 | if (0 != mark[session->local_peer_idx]) | ||
1919 | { | ||
1920 | return; | ||
1921 | } | ||
1922 | } | 1960 | } |
1923 | } | 1961 | } |
1924 | 1962 | ||
@@ -1942,7 +1980,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1942 | 1980 | ||
1943 | session = cls; | 1981 | session = cls; |
1944 | 1982 | ||
1945 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "subround over, exp_round=%d, exp_subround=%d\n", session->exp_round, session->exp_subround); | ||
1946 | 1983 | ||
1947 | for (i = 0; i < session->num_peers; i++) | 1984 | for (i = 0; i < session->num_peers; i++) |
1948 | clear_peer_messages (&session->info[i]); | 1985 | clear_peer_messages (&session->info[i]); |
@@ -1955,12 +1992,14 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1955 | 1992 | ||
1956 | if ((session->num_peers == 2) && (session->exp_round == 1)) | 1993 | if ((session->num_peers == 2) && (session->exp_round == 1)) |
1957 | { | 1994 | { |
1995 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n"); | ||
1958 | round_over (session, NULL); | 1996 | round_over (session, NULL); |
1959 | return; | 1997 | return; |
1960 | } | 1998 | } |
1961 | 1999 | ||
1962 | if (session->exp_round == NUM_EXP_ROUNDS) | 2000 | if (session->exp_round == NUM_EXP_ROUNDS) |
1963 | { | 2001 | { |
2002 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n"); | ||
1964 | round_over (session, NULL); | 2003 | round_over (session, NULL); |
1965 | return; | 2004 | return; |
1966 | } | 2005 | } |
@@ -1987,24 +2026,49 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1987 | 2026 | ||
1988 | find_partners (session); | 2027 | find_partners (session); |
1989 | 2028 | ||
2029 | { | ||
2030 | int in; | ||
2031 | int out; | ||
2032 | if (session->partner_outgoing == NULL) | ||
2033 | out = -1; | ||
2034 | else | ||
2035 | out = (int) (session->partner_outgoing - session->info); | ||
2036 | if (session->partner_incoming == NULL) | ||
2037 | in = -1; | ||
2038 | else | ||
2039 | in = (int) (session->partner_incoming - session->info); | ||
2040 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, | ||
2041 | session->exp_round, session->exp_subround, in, out); | ||
2042 | } | ||
2043 | |||
2044 | |||
1990 | if (NULL != session->partner_outgoing) | 2045 | if (NULL != session->partner_outgoing) |
1991 | { | 2046 | { |
1992 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; | 2047 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; |
1993 | session->partner_outgoing->ibf_bucket_counter = 0; | 2048 | session->partner_outgoing->ibf_bucket_counter = 0; |
2049 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
1994 | } | 2050 | } |
1995 | 2051 | ||
1996 | if (NULL != session->partner_incoming) | 2052 | if (NULL != session->partner_incoming) |
1997 | { | 2053 | { |
1998 | session->partner_incoming->ibf_state = IBF_STATE_NONE; | 2054 | session->partner_incoming->ibf_state = IBF_STATE_NONE; |
2055 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
1999 | session->partner_incoming->ibf_bucket_counter = 0; | 2056 | session->partner_incoming->ibf_bucket_counter = 0; |
2000 | 2057 | ||
2001 | /* maybe there's an early strata estimator? */ | 2058 | /* maybe there's an early strata estimator? */ |
2002 | if (NULL != session->partner_incoming->premature_strata_message) | 2059 | if (NULL != session->partner_incoming->premature_strata_message) |
2003 | { | 2060 | { |
2061 | struct StrataMessage *sm; | ||
2062 | |||
2063 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); | ||
2064 | sm = session->partner_incoming->premature_strata_message; | ||
2065 | session->partner_incoming->premature_strata_message = NULL; | ||
2066 | |||
2004 | session->partner_incoming->replaying_strata_message = GNUNET_YES; | 2067 | session->partner_incoming->replaying_strata_message = GNUNET_YES; |
2005 | handle_p2p_strata (session->partner_incoming, session->partner_incoming->premature_strata_message); | 2068 | handle_p2p_strata (session->partner_incoming, sm); |
2006 | GNUNET_free (session->partner_incoming->premature_strata_message); | ||
2007 | session->partner_incoming->replaying_strata_message = GNUNET_NO; | 2069 | session->partner_incoming->replaying_strata_message = GNUNET_NO; |
2070 | |||
2071 | GNUNET_free (sm); | ||
2008 | } | 2072 | } |
2009 | } | 2073 | } |
2010 | 2074 | ||
@@ -2108,7 +2172,7 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2108 | subround_over (session, NULL); | 2172 | subround_over (session, NULL); |
2109 | break; | 2173 | break; |
2110 | case CONSENSUS_ROUND_EXCHANGE: | 2174 | case CONSENSUS_ROUND_EXCHANGE: |
2111 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "done for now\n"); | 2175 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx); |
2112 | 2176 | ||
2113 | if (0) | 2177 | if (0) |
2114 | { | 2178 | { |
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 61c382f4c..8a8d5b89f 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf | |||
@@ -5,7 +5,7 @@ HOSTNAME = localhost | |||
5 | HOME = $SERVICEHOME | 5 | HOME = $SERVICEHOME |
6 | BINARY = gnunet-service-consensus | 6 | BINARY = gnunet-service-consensus |
7 | #PREFIX = gdbserver :12345 | 7 | #PREFIX = gdbserver :12345 |
8 | PREFIX = valgrind | 8 | #PREFIX = valgrind |
9 | ACCEPT_FROM = 127.0.0.1; | 9 | ACCEPT_FROM = 127.0.0.1; |
10 | ACCEPT_FROM6 = ::1; | 10 | ACCEPT_FROM6 = ::1; |
11 | UNIXPATH = /tmp/gnunet-service-consensus.sock | 11 | UNIXPATH = /tmp/gnunet-service-consensus.sock |