aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-03-19 16:34:45 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-03-19 16:34:45 +0000
commite2cd40becaf2dda615712d4a0577d6750ad3c8e7 (patch)
tree993909e7d9efb7ecb7c1c20fea4f3e20b9c7f797 /src/consensus
parent4d0c05606b571f31b737847d5821218f1f48e78d (diff)
downloadgnunet-e2cd40becaf2dda615712d4a0577d6750ad3c8e7.tar.gz
gnunet-e2cd40becaf2dda615712d4a0577d6750ad3c8e7.zip
fix for for multi-peer consensus, non-power-of-two consensus now works
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/gnunet-service-consensus.c106
-rw-r--r--src/consensus/test_consensus.conf2
2 files changed, 86 insertions, 22 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index b1323f902..e300d9169 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -241,6 +241,11 @@ struct ConsensusPeerInformation
241 */ 241 */
242 struct StrataMessage *premature_strata_message; 242 struct StrataMessage *premature_strata_message;
243 243
244 /**
245 * We have finishes the exp-subround with the peer.
246 */
247 int exp_subround_finished;
248
244}; 249};
245 250
246typedef void (*QueuedMessageCallback) (void *msg); 251typedef void (*QueuedMessageCallback) (void *msg);
@@ -787,11 +792,21 @@ handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GN
787} 792}
788 793
789static void 794static void
790queue_cont_subround_over (void *cls) 795fin_sent_cb (void *cls)
791{ 796{
792 struct ConsensusSession *session; 797 struct ConsensusPeerInformation *cpi;
793 session = cls; 798 int not_finished;
794 subround_over (session, NULL); 799 cpi = cls;
800 cpi->exp_subround_finished = GNUNET_YES;
801 /* the subround is only really over if *both* partners are done */
802 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
803 not_finished = 0;
804 if ((cpi->session->partner_outgoing != NULL) && (cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO))
805 not_finished++;
806 if ((cpi->session->partner_incoming != NULL) && (cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO))
807 not_finished++;
808 if (0 == not_finished)
809 subround_over (cpi->session, NULL);
795} 810}
796 811
797 812
@@ -806,12 +821,13 @@ handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_Mes
806 switch (cpi->session->current_round) 821 switch (cpi->session->current_round)
807 { 822 {
808 case CONSENSUS_ROUND_EXCHANGE: 823 case CONSENSUS_ROUND_EXCHANGE:
824 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
809 fin_msg = GNUNET_malloc (sizeof *fin_msg); 825 fin_msg = GNUNET_malloc (sizeof *fin_msg);
810 fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); 826 fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
811 fin_msg->size = htons (sizeof *fin_msg); 827 fin_msg->size = htons (sizeof *fin_msg);
812 /* the subround os over once we kicked off sending the fin msg */ 828 /* the subround os over once we kicked off sending the fin msg */
813 /* FIXME: assert we are talking to the right peer! */ 829 /* FIXME: assert we are talking to the right peer! */
814 queue_peer_message_with_cls (cpi, fin_msg, queue_cont_subround_over, cpi->session); 830 queue_peer_message_with_cls (cpi, fin_msg, fin_sent_cb, cpi);
815 break; 831 break;
816 default: 832 default:
817 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); 833 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
@@ -830,8 +846,20 @@ handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_Messag
830 switch (cpi->session->current_round) 846 switch (cpi->session->current_round)
831 { 847 {
832 case CONSENSUS_ROUND_EXCHANGE: 848 case CONSENSUS_ROUND_EXCHANGE:
833 subround_over (cpi->session, NULL); 849 {
834 break; 850 int not_finished;
851 cpi->exp_subround_finished = GNUNET_YES;
852 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
853 /* the subround is only really over if *both* partners are done */
854 not_finished = 0;
855 if ((cpi->session->partner_outgoing != NULL) && (cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO))
856 not_finished++;
857 if ((cpi->session->partner_incoming != NULL) && (cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO))
858 not_finished++;
859 if (0 == not_finished)
860 subround_over (cpi->session, NULL);
861 }
862 break;
835 default: 863 default:
836 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); 864 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
837 break; 865 break;
@@ -872,6 +900,8 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
872 void *buf; 900 void *buf;
873 size_t size; 901 size_t size;
874 902
903
904
875 switch (cpi->session->current_round) 905 switch (cpi->session->current_round)
876 { 906 {
877 case CONSENSUS_ROUND_EXCHANGE: 907 case CONSENSUS_ROUND_EXCHANGE:
@@ -881,7 +911,8 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
881 { 911 {
882 if (GNUNET_NO == cpi->replaying_strata_message) 912 if (GNUNET_NO == cpi->replaying_strata_message)
883 { 913 {
884 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got probably premature message\n"); 914 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n",
915 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
885 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); 916 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
886 } 917 }
887 return GNUNET_YES; 918 return GNUNET_YES;
@@ -905,7 +936,9 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
905 } 936 }
906 937
907 diff = estimate_difference (cpi->session->se, cpi->se); 938 diff = estimate_difference (cpi->session->se, cpi->se);
908 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff); 939
940 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d\n",
941 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
909 942
910 if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) || 943 if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) ||
911 (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)) 944 (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round))
@@ -988,9 +1021,9 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
988 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) 1021 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
989 { 1022 {
990 cpi->ibf_state = IBF_STATE_DECODING; 1023 cpi->ibf_state = IBF_STATE_DECODING;
1024 cpi->ibf_bucket_counter = 0;
991 prepare_ibf (cpi); 1025 prepare_ibf (cpi);
992 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); 1026 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
993 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "about to decode\n");
994 decode (cpi); 1027 decode (cpi);
995 } 1028 }
996 return GNUNET_YES; 1029 return GNUNET_YES;
@@ -1075,7 +1108,8 @@ embrace_peer (struct ConsensusPeerInformation *cpi)
1075 case CONSENSUS_ROUND_INVENTORY: 1108 case CONSENSUS_ROUND_INVENTORY:
1076 /* fallthrough */ 1109 /* fallthrough */
1077 case CONSENSUS_ROUND_STOCK: 1110 case CONSENSUS_ROUND_STOCK:
1078 send_strata_estimator (cpi); 1111 if (cpi == cpi->session->partner_outgoing)
1112 send_strata_estimator (cpi);
1079 default: 1113 default:
1080 break; 1114 break;
1081 } 1115 }
@@ -1128,6 +1162,11 @@ send_strata_estimator (struct ConsensusPeerInformation *cpi)
1128 size_t msize; 1162 size_t msize;
1129 int i; 1163 int i;
1130 1164
1165
1166 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE to P%d\n",
1167 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1168
1169
1131 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); 1170 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1132 1171
1133 strata_msg = GNUNET_malloc (msize); 1172 strata_msg = GNUNET_malloc (msize);
@@ -1155,6 +1194,10 @@ static void
1155send_ibf (struct ConsensusPeerInformation *cpi) 1194send_ibf (struct ConsensusPeerInformation *cpi)
1156{ 1195{
1157 int sent_buckets; 1196 int sent_buckets;
1197
1198 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
1199 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1200
1158 sent_buckets = 0; 1201 sent_buckets = 0;
1159 while (sent_buckets < (1 << cpi->ibf_order)) 1202 while (sent_buckets < (1 << cpi->ibf_order))
1160 { 1203 {
@@ -1214,7 +1257,7 @@ decode (struct ConsensusPeerInformation *cpi)
1214 if (GNUNET_NO == res) 1257 if (GNUNET_NO == res)
1215 { 1258 {
1216 struct GNUNET_MessageHeader *msg; 1259 struct GNUNET_MessageHeader *msg;
1217 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n"); 1260 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
1218 msg = GNUNET_malloc (sizeof *msg); 1261 msg = GNUNET_malloc (sizeof *msg);
1219 msg->size = htons (sizeof *msg); 1262 msg->size = htons (sizeof *msg);
1220 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); 1263 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
@@ -1903,7 +1946,6 @@ find_partners (struct ConsensusSession *session)
1903 continue; 1946 continue;
1904 arc = (i + (1 << session->exp_subround)) % session->num_peers; 1947 arc = (i + (1 << session->exp_subround)) % session->num_peers;
1905 mark[i] = mark[arc] = 1; 1948 mark[i] = mark[arc] = 1;
1906 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d talks to %d\n", i, arc);
1907 GNUNET_assert (i != arc); 1949 GNUNET_assert (i != arc);
1908 if (i == session->local_peer_idx) 1950 if (i == session->local_peer_idx)
1909 { 1951 {
@@ -1915,10 +1957,6 @@ find_partners (struct ConsensusSession *session)
1915 GNUNET_assert (NULL == session->partner_incoming); 1957 GNUNET_assert (NULL == session->partner_incoming);
1916 session->partner_incoming = &session->info[session->shuffle[i]]; 1958 session->partner_incoming = &session->info[session->shuffle[i]];
1917 } 1959 }
1918 if (0 != mark[session->local_peer_idx])
1919 {
1920 return;
1921 }
1922 } 1960 }
1923} 1961}
1924 1962
@@ -1942,7 +1980,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1942 1980
1943 session = cls; 1981 session = cls;
1944 1982
1945 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "subround over, exp_round=%d, exp_subround=%d\n", session->exp_round, session->exp_subround);
1946 1983
1947 for (i = 0; i < session->num_peers; i++) 1984 for (i = 0; i < session->num_peers; i++)
1948 clear_peer_messages (&session->info[i]); 1985 clear_peer_messages (&session->info[i]);
@@ -1955,12 +1992,14 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1955 1992
1956 if ((session->num_peers == 2) && (session->exp_round == 1)) 1993 if ((session->num_peers == 2) && (session->exp_round == 1))
1957 { 1994 {
1995 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n");
1958 round_over (session, NULL); 1996 round_over (session, NULL);
1959 return; 1997 return;
1960 } 1998 }
1961 1999
1962 if (session->exp_round == NUM_EXP_ROUNDS) 2000 if (session->exp_round == NUM_EXP_ROUNDS)
1963 { 2001 {
2002 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n");
1964 round_over (session, NULL); 2003 round_over (session, NULL);
1965 return; 2004 return;
1966 } 2005 }
@@ -1987,24 +2026,49 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1987 2026
1988 find_partners (session); 2027 find_partners (session);
1989 2028
2029 {
2030 int in;
2031 int out;
2032 if (session->partner_outgoing == NULL)
2033 out = -1;
2034 else
2035 out = (int) (session->partner_outgoing - session->info);
2036 if (session->partner_incoming == NULL)
2037 in = -1;
2038 else
2039 in = (int) (session->partner_incoming - session->info);
2040 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
2041 session->exp_round, session->exp_subround, in, out);
2042 }
2043
2044
1990 if (NULL != session->partner_outgoing) 2045 if (NULL != session->partner_outgoing)
1991 { 2046 {
1992 session->partner_outgoing->ibf_state = IBF_STATE_NONE; 2047 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
1993 session->partner_outgoing->ibf_bucket_counter = 0; 2048 session->partner_outgoing->ibf_bucket_counter = 0;
2049 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1994 } 2050 }
1995 2051
1996 if (NULL != session->partner_incoming) 2052 if (NULL != session->partner_incoming)
1997 { 2053 {
1998 session->partner_incoming->ibf_state = IBF_STATE_NONE; 2054 session->partner_incoming->ibf_state = IBF_STATE_NONE;
2055 session->partner_incoming->exp_subround_finished = GNUNET_NO;
1999 session->partner_incoming->ibf_bucket_counter = 0; 2056 session->partner_incoming->ibf_bucket_counter = 0;
2000 2057
2001 /* maybe there's an early strata estimator? */ 2058 /* maybe there's an early strata estimator? */
2002 if (NULL != session->partner_incoming->premature_strata_message) 2059 if (NULL != session->partner_incoming->premature_strata_message)
2003 { 2060 {
2061 struct StrataMessage *sm;
2062
2063 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
2064 sm = session->partner_incoming->premature_strata_message;
2065 session->partner_incoming->premature_strata_message = NULL;
2066
2004 session->partner_incoming->replaying_strata_message = GNUNET_YES; 2067 session->partner_incoming->replaying_strata_message = GNUNET_YES;
2005 handle_p2p_strata (session->partner_incoming, session->partner_incoming->premature_strata_message); 2068 handle_p2p_strata (session->partner_incoming, sm);
2006 GNUNET_free (session->partner_incoming->premature_strata_message);
2007 session->partner_incoming->replaying_strata_message = GNUNET_NO; 2069 session->partner_incoming->replaying_strata_message = GNUNET_NO;
2070
2071 GNUNET_free (sm);
2008 } 2072 }
2009 } 2073 }
2010 2074
@@ -2108,7 +2172,7 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2108 subround_over (session, NULL); 2172 subround_over (session, NULL);
2109 break; 2173 break;
2110 case CONSENSUS_ROUND_EXCHANGE: 2174 case CONSENSUS_ROUND_EXCHANGE:
2111 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "done for now\n"); 2175 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx);
2112 2176
2113 if (0) 2177 if (0)
2114 { 2178 {
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 61c382f4c..8a8d5b89f 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -5,7 +5,7 @@ HOSTNAME = localhost
5HOME = $SERVICEHOME 5HOME = $SERVICEHOME
6BINARY = gnunet-service-consensus 6BINARY = gnunet-service-consensus
7#PREFIX = gdbserver :12345 7#PREFIX = gdbserver :12345
8PREFIX = valgrind 8#PREFIX = valgrind
9ACCEPT_FROM = 127.0.0.1; 9ACCEPT_FROM = 127.0.0.1;
10ACCEPT_FROM6 = ::1; 10ACCEPT_FROM6 = ::1;
11UNIXPATH = /tmp/gnunet-service-consensus.sock 11UNIXPATH = /tmp/gnunet-service-consensus.sock