aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/.gitignore6
-rw-r--r--src/consensus/Makefile.am120
-rw-r--r--src/consensus/consensus-simulation.py.in112
-rw-r--r--src/consensus/consensus.conf.in10
-rw-r--r--src/consensus/consensus.h92
-rw-r--r--src/consensus/consensus_api.c349
-rw-r--r--src/consensus/consensus_protocol.h137
-rw-r--r--src/consensus/gnunet-consensus-profiler.c586
-rw-r--r--src/consensus/gnunet-service-consensus.c3572
-rw-r--r--src/consensus/plugin_block_consensus.c137
-rw-r--r--src/consensus/test_consensus.conf87
-rw-r--r--src/consensus/test_consensus_api.c120
12 files changed, 0 insertions, 5328 deletions
diff --git a/src/consensus/.gitignore b/src/consensus/.gitignore
deleted file mode 100644
index 8050d760e..000000000
--- a/src/consensus/.gitignore
+++ /dev/null
@@ -1,6 +0,0 @@
1gnunet-service-evil-consensus
2gnunet-consensus-profiler
3gnunet-service-consensus
4test_consensus_api
5resource.log.master
6consensus-simulation.py
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
deleted file mode 100644
index cf1d32e74..000000000
--- a/src/consensus/Makefile.am
+++ /dev/null
@@ -1,120 +0,0 @@
1# This Makefile.am is in the public domain
2AM_CPPFLAGS = -I$(top_srcdir)/src/include
3
4pkgcfgdir= $(pkgdatadir)/config.d/
5
6libexecdir= $(pkglibdir)/libexec/
7
8plugindir = $(libdir)/gnunet
9
10pkgcfg_DATA = \
11 consensus.conf
12
13if USE_COVERAGE
14 AM_CFLAGS = -fprofile-arcs -ftest-coverage
15endif
16
17
18libexec_PROGRAMS = \
19 gnunet-service-consensus
20
21if ENABLE_MALICIOUS
22libexec_PROGRAMS += \
23 gnunet-service-evil-consensus
24endif
25
26SUFFIXES = .py.in .py
27
28.py.in.py:
29 $(AWK) -v bdir="$(bindir)" -v py="$(PYTHON)" -v awkay="$(AWK_BINARY)" -v pfx="$(prefix)" -v prl="$(PERL)" -v sysconfdirectory="$(sysconfdir)" -v pkgdatadirectory="$(pkgdatadir)" -f $(top_srcdir)/bin/dosubst.awk < $< > $@
30 chmod +x $@
31
32check-python-style:
33 flake8 consensus-simulation.py.in
34
35lib_LTLIBRARIES = \
36 libgnunetconsensus.la
37
38gnunet_consensus_profiler_SOURCES = \
39 gnunet-consensus-profiler.c
40gnunet_consensus_profiler_LDADD = \
41 $(top_builddir)/src/util/libgnunetutil.la \
42 libgnunetconsensus.la \
43 $(top_builddir)/src/testing/libgnunettesting.la \
44 $(top_builddir)/src/testbed/libgnunettestbed.la \
45 $(GN_LIBINTL)
46
47gnunet_service_consensus_SOURCES = \
48 gnunet-service-consensus.c
49gnunet_service_consensus_LDADD = \
50 $(top_builddir)/src/util/libgnunetutil.la \
51 $(top_builddir)/src/core/libgnunetcore.la \
52 $(top_builddir)/src/set/libgnunetset.la \
53 $(top_builddir)/src/statistics/libgnunetstatistics.la \
54 $(GN_LIBINTL)
55
56gnunet_service_evil_consensus_SOURCES = \
57 gnunet-service-consensus.c \
58 consensus_protocol.h
59gnunet_service_evil_consensus_LDADD = \
60 $(top_builddir)/src/util/libgnunetutil.la \
61 $(top_builddir)/src/core/libgnunetcore.la \
62 $(top_builddir)/src/set/libgnunetset.la \
63 $(top_builddir)/src/statistics/libgnunetstatistics.la \
64 $(GN_LIBINTL)
65gnunet_service_evil_consensus_CFLAGS = -DEVIL
66
67libgnunetconsensus_la_SOURCES = \
68 consensus_api.c \
69 consensus.h
70libgnunetconsensus_la_LIBADD = \
71 $(top_builddir)/src/util/libgnunetutil.la \
72 $(LTLIBINTL)
73libgnunetconsensus_la_LDFLAGS = \
74 $(GN_LIB_LDFLAGS)
75
76
77plugin_LTLIBRARIES = \
78 libgnunet_plugin_block_consensus.la
79
80libgnunet_plugin_block_consensus_la_SOURCES = \
81 plugin_block_consensus.c
82libgnunet_plugin_block_consensus_la_LIBADD = \
83 $(top_builddir)/src/block/libgnunetblock.la \
84 $(top_builddir)/src/block/libgnunetblockgroup.la \
85 $(top_builddir)/src/util/libgnunetutil.la \
86 $(LTLIBINTL)
87libgnunet_plugin_block_consensus_la_LDFLAGS = \
88 $(GN_PLUGIN_LDFLAGS)
89
90
91
92if HAVE_TESTING
93bin_PROGRAMS = \
94 gnunet-consensus-profiler
95
96check_PROGRAMS = \
97 test_consensus_api
98
99if ENABLE_TEST_RUN
100AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME;
101TESTS = $(check_PROGRAMS)
102endif
103endif
104
105test_consensus_api_SOURCES = \
106 test_consensus_api.c
107test_consensus_api_LDADD = \
108 $(top_builddir)/src/util/libgnunetutil.la \
109 $(top_builddir)/src/testing/libgnunettesting.la \
110 libgnunetconsensus.la
111
112noinst_SCRIPTS = \
113 consensus-simulation.py
114
115CLEANFILES = \
116 $(noinst_SCRIPTS)
117
118EXTRA_DIST = \
119 test_consensus.conf \
120 consensus-simulation.py.in
diff --git a/src/consensus/consensus-simulation.py.in b/src/consensus/consensus-simulation.py.in
deleted file mode 100644
index 272a52da2..000000000
--- a/src/consensus/consensus-simulation.py.in
+++ /dev/null
@@ -1,112 +0,0 @@
1#!@PYTHONEXE@
2# This file is part of GNUnet
3# (C) 2013, 2018 Christian Grothoff (and other contributing authors)
4#
5# GNUnet is free software: you can redistribute it and/or modify it
6# under the terms of the GNU Affero General Public License as published
7# by the Free Software Foundation, either version 3 of the License,
8# or (at your option) any later version.
9#
10# GNUnet is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13# Affero General Public License for more details.
14#
15# You should have received a copy of the GNU Affero General Public License
16# along with this program. If not, see <http://www.gnu.org/licenses/>.
17#
18# SPDX-License-Identifier: AGPL3.0-or-later
19
20import argparse
21import random
22from math import ceil, log, floor
23
24
25def bsc(n):
26 """ count the bits set in n"""
27 l = n.bit_length()
28 c = 0
29 x = 1
30 for _ in range(0, l):
31 if n & x:
32 c = c + 1
33 x = x << 1
34 return c
35
36
37def simulate(k, n, verbose):
38 assert k < n
39 largest_arc = int(2**ceil(log(n, 2))) // 2
40 num_ghosts = (2 * largest_arc) - n
41 if verbose:
42 print("we have", num_ghosts, "ghost peers")
43 # n.b. all peers with idx<k are evil
44 peers = list(range(n))
45 info = [1 << x for x in range(n)]
46
47 def done_p():
48 for x in range(k, n):
49 if bsc(info[x]) < n - k:
50 return False
51 return True
52
53 rounds = 0
54 while not done_p():
55 if verbose:
56 print("-- round --")
57 arc = 1
58 while arc <= largest_arc:
59 if verbose:
60 print("-- subround --")
61 new_info = [x for x in info]
62 for peer_physical in range(n):
63 peer_logical = peers[peer_physical]
64 peer_type = None
65 partner_logical = (peer_logical + arc) % n
66 partner_physical = peers.index(partner_logical)
67 if peer_physical < k or partner_physical < k:
68 if verbose:
69 print(
70 "bad peer in connection", peer_physical, "--",
71 partner_physical
72 )
73 continue
74 if peer_logical & arc == 0:
75 # we are outgoing
76 if verbose:
77 print(peer_physical, "connects to", partner_physical)
78 peer_type = "outgoing"
79 if peer_logical < num_ghosts:
80 # we have a ghost, check if the peer who connects
81 # to our ghost is actually outgoing
82 ghost_partner_logical = (peer_logical - arc) % n
83 if ghost_partner_logical & arc == 0:
84 peer_type = peer_type + ", ghost incoming"
85 new_info[peer_physical] = new_info[peer_physical] | info[
86 peer_physical] | info[partner_physical]
87 new_info[partner_physical
88 ] = new_info[partner_physical] | info[
89 peer_physical] | info[partner_physical]
90 else:
91 peer_type = "incoming"
92 if verbose > 1:
93 print("type of", str(peer_physical) + ":", peer_type)
94 info = new_info
95 arc = arc << 1
96 rounds = rounds + 1
97 random.shuffle(peers)
98 return rounds
99
100
101if __name__ == "__main__":
102 parser = argparse.ArgumentParser()
103 parser.add_argument("k", metavar="k", type=int, help="#(bad peers)")
104 parser.add_argument("n", metavar="n", type=int, help="#(all peers)")
105 parser.add_argument("r", metavar="r", type=int, help="#(rounds)")
106 parser.add_argument('--verbose', '-v', action='count')
107
108 args = parser.parse_args()
109 sum = 0.0
110 for n in range(0, args.r):
111 sum += simulate(args.k, args.n, args.verbose)
112 print(sum // args.r)
diff --git a/src/consensus/consensus.conf.in b/src/consensus/consensus.conf.in
deleted file mode 100644
index b0fbcaf5a..000000000
--- a/src/consensus/consensus.conf.in
+++ /dev/null
@@ -1,10 +0,0 @@
1[consensus]
2START_ON_DEMAND = @START_ON_DEMAND@
3@JAVAPORT@PORT = 2103
4HOSTNAME = localhost
5BINARY = gnunet-service-consensus
6ACCEPT_FROM = 127.0.0.1;
7ACCEPT_FROM6 = ::1;
8UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-consensus.sock
9UNIX_MATCH_UID = YES
10UNIX_MATCH_GID = YES
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h
deleted file mode 100644
index 888213d55..000000000
--- a/src/consensus/consensus.h
+++ /dev/null
@@ -1,92 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @author Florian Dold
23 * @file consensus/consensus.h
24 * @brief
25 */
26#ifndef CONSENSUS_H
27#define CONSENSUS_H
28
29#include "gnunet_common.h"
30
31GNUNET_NETWORK_STRUCT_BEGIN
32
33/**
34 * Sent by the client to the service,
35 * when the client wants the service to join a consensus session.
36 */
37struct GNUNET_CONSENSUS_JoinMessage
38{
39 /**
40 * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN
41 */
42 struct GNUNET_MessageHeader header;
43
44 /**
45 * Number of peers (at the end of this message) that want to
46 * participate in the consensus.
47 */
48 uint32_t num_peers GNUNET_PACKED;
49
50 /**
51 * Session id of the consensus.
52 */
53 struct GNUNET_HashCode session_id;
54
55 /**
56 * Start time for the consensus.
57 */
58 struct GNUNET_TIME_AbsoluteNBO start;
59
60 /**
61 * Deadline for conclude.
62 */
63 struct GNUNET_TIME_AbsoluteNBO deadline;
64
65 /* GNUNET_PeerIdentity[num_peers] */
66};
67
68
69/**
70 * Message with an element
71 */
72struct GNUNET_CONSENSUS_ElementMessage
73{
74 /**
75 * Type:
76 * Either GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT
77 * or GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT_ELEMENT
78 */
79 struct GNUNET_MessageHeader header;
80
81 /**
82 * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_NEW_ELEMENT
83 */
84 uint16_t element_type GNUNET_PACKED; /* FIXME: alignment? => uint32_t */
85
86 /* rest: element data */
87};
88
89
90GNUNET_NETWORK_STRUCT_END
91
92#endif
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
deleted file mode 100644
index b4a9e5d39..000000000
--- a/src/consensus/consensus_api.c
+++ /dev/null
@@ -1,349 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file consensus/consensus_api.c
23 * @brief
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_client_lib.h"
30#include "gnunet_consensus_service.h"
31#include "consensus.h"
32
33
34#define LOG(kind, ...) GNUNET_log_from (kind, "consensus-api", __VA_ARGS__)
35
36
37/**
38 * Handle for the service.
39 */
40struct GNUNET_CONSENSUS_Handle
41{
42 /**
43 * Configuration to use.
44 */
45 const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47 /**
48 * Callback for new elements. Not called for elements added locally.
49 */
50 GNUNET_CONSENSUS_ElementCallback new_element_cb;
51
52 /**
53 * Closure for @e new_element_cb
54 */
55 void *new_element_cls;
56
57 /**
58 * The (local) session identifier for the consensus session.
59 */
60 struct GNUNET_HashCode session_id;
61
62 /**
63 * #GNUNET_YES iff the join message has been sent to the service.
64 */
65 int joined;
66
67 /**
68 * Called when the conclude operation finishes or fails.
69 */
70 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
71
72 /**
73 * Closure for the @e conclude_cb callback.
74 */
75 void *conclude_cls;
76
77 /**
78 * Deadline for the conclude operation.
79 */
80 struct GNUNET_TIME_Absolute conclude_deadline;
81
82 /**
83 * Message queue for the client.
84 */
85 struct GNUNET_MQ_Handle *mq;
86};
87
88
89/**
90 * FIXME: this should not bee necessary when the API
91 * issue has been fixed
92 */
93struct InsertDoneInfo
94{
95 GNUNET_CONSENSUS_InsertDoneCallback idc;
96 void *cls;
97};
98
99
100/**
101 * Called when the server has sent is a new element
102 *
103 * @param cls consensus handle
104 * @param msg element message
105 */
106static int
107check_new_element (void *cls,
108 const struct GNUNET_CONSENSUS_ElementMessage *msg)
109{
110 /* any size is fine, elements are variable-size */
111 return GNUNET_OK;
112}
113
114
115/**
116 * Called when the server has sent is a new element
117 *
118 * @param cls consensus handle
119 * @param msg element message
120 */
121static void
122handle_new_element (void *cls,
123 const struct GNUNET_CONSENSUS_ElementMessage *msg)
124{
125 struct GNUNET_CONSENSUS_Handle *consensus = cls;
126 struct GNUNET_SET_Element element;
127
128 LOG (GNUNET_ERROR_TYPE_DEBUG,
129 "received new element\n");
130 element.element_type = msg->element_type;
131 element.size = ntohs (msg->header.size) - sizeof(struct
132 GNUNET_CONSENSUS_ElementMessage);
133 element.data = &msg[1];
134 consensus->new_element_cb (consensus->new_element_cls,
135 &element);
136}
137
138
139/**
140 * Called when the server has announced
141 * that the conclusion is over.
142 *
143 * @param cls consensus handle
144 * @param msg conclude done message
145 */
146static void
147handle_conclude_done (void *cls,
148 const struct GNUNET_MessageHeader *msg)
149{
150 struct GNUNET_CONSENSUS_Handle *consensus = cls;
151 GNUNET_CONSENSUS_ConcludeCallback cc;
152
153 GNUNET_MQ_destroy (consensus->mq);
154 consensus->mq = NULL;
155 GNUNET_assert (NULL != (cc = consensus->conclude_cb));
156 consensus->conclude_cb = NULL;
157 cc (consensus->conclude_cls);
158}
159
160
161/**
162 * Generic error handler, called with the appropriate
163 * error code and the same closure specified at the creation of
164 * the message queue.
165 * Not every message queue implementation supports an error handler.
166 *
167 * @param cls closure, same closure as for the message handlers
168 * @param error error code
169 */
170static void
171mq_error_handler (void *cls,
172 enum GNUNET_MQ_Error error)
173{
174 LOG (GNUNET_ERROR_TYPE_WARNING,
175 "consensus service disconnected us\n");
176}
177
178
179/**
180 * Create a consensus session.
181 *
182 * @param cfg configuration to use for connecting to the consensus service
183 * @param num_peers number of peers in the peers array
184 * @param peers array of peers participating in this consensus session
185 * Inclusion of the local peer is optional.
186 * @param session_id session identifier
187 * Allows a group of peers to have more than consensus session.
188 * @param start start time of the consensus, conclude should be called before
189 * the start time.
190 * @param deadline time when the consensus should have concluded
191 * @param new_element_cb callback, called when a new element is added to the set by
192 * another peer
193 * @param new_element_cls closure for new_element
194 * @return handle to use, NULL on error
195 */
196struct GNUNET_CONSENSUS_Handle *
197GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
198 unsigned int num_peers,
199 const struct GNUNET_PeerIdentity *peers,
200 const struct GNUNET_HashCode *session_id,
201 struct GNUNET_TIME_Absolute start,
202 struct GNUNET_TIME_Absolute deadline,
203 GNUNET_CONSENSUS_ElementCallback new_element_cb,
204 void *new_element_cls)
205{
206 struct GNUNET_CONSENSUS_Handle *consensus
207 = GNUNET_new (struct GNUNET_CONSENSUS_Handle);
208 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
209 GNUNET_MQ_hd_var_size (new_element,
210 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT,
211 struct GNUNET_CONSENSUS_ElementMessage,
212 consensus),
213 GNUNET_MQ_hd_fixed_size (conclude_done,
214 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE,
215 struct GNUNET_MessageHeader,
216 consensus),
217 GNUNET_MQ_handler_end ()
218 };
219 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
220 struct GNUNET_MQ_Envelope *ev;
221
222 consensus->cfg = cfg;
223 consensus->new_element_cb = new_element_cb;
224 consensus->new_element_cls = new_element_cls;
225 consensus->session_id = *session_id;
226 consensus->mq = GNUNET_CLIENT_connect (cfg,
227 "consensus",
228 mq_handlers,
229 &mq_error_handler,
230 consensus);
231 if (NULL == consensus->mq)
232 {
233 GNUNET_free (consensus);
234 return NULL;
235 }
236 ev = GNUNET_MQ_msg_extra (join_msg,
237 (num_peers * sizeof(struct GNUNET_PeerIdentity)),
238 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
239
240 join_msg->session_id = consensus->session_id;
241 join_msg->start = GNUNET_TIME_absolute_hton (start);
242 join_msg->deadline = GNUNET_TIME_absolute_hton (deadline);
243 join_msg->num_peers = htonl (num_peers);
244 GNUNET_memcpy (&join_msg[1],
245 peers,
246 num_peers * sizeof(struct GNUNET_PeerIdentity));
247
248 GNUNET_MQ_send (consensus->mq, ev);
249 return consensus;
250}
251
252
253static void
254idc_adapter (void *cls)
255{
256 struct InsertDoneInfo *i = cls;
257
258 i->idc (i->cls, GNUNET_OK);
259 GNUNET_free (i);
260}
261
262
263/**
264 * Insert an element in the set being reconsiled. Must not be called after
265 * "GNUNET_CONSENSUS_conclude".
266 *
267 * @param consensus handle for the consensus session
268 * @param element the element to be inserted
269 * @param idc function called when we are done with this element and it
270 * is thus allowed to call GNUNET_CONSENSUS_insert again
271 * @param idc_cls closure for 'idc'
272 */
273void
274GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
275 const struct GNUNET_SET_Element *element,
276 GNUNET_CONSENSUS_InsertDoneCallback idc,
277 void *idc_cls)
278{
279 struct GNUNET_CONSENSUS_ElementMessage *element_msg;
280 struct GNUNET_MQ_Envelope *ev;
281 struct InsertDoneInfo *i;
282
283 LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%u\n", element->size);
284
285 ev = GNUNET_MQ_msg_extra (element_msg, element->size,
286 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
287
288 GNUNET_memcpy (&element_msg[1], element->data, element->size);
289
290 if (NULL != idc)
291 {
292 i = GNUNET_new (struct InsertDoneInfo);
293 i->idc = idc;
294 i->cls = idc_cls;
295 GNUNET_MQ_notify_sent (ev, idc_adapter, i);
296 }
297 GNUNET_MQ_send (consensus->mq, ev);
298}
299
300
301/**
302 * We are done with inserting new elements into the consensus;
303 * try to conclude the consensus within a given time window.
304 * After conclude has been called, no further elements may be
305 * inserted by the client.
306 *
307 * @param consensus consensus session
308 * @param deadline deadline after which the conculde callback
309 * must be called
310 * @param conclude called when the conclusion was successful
311 * @param conclude_cls closure for the conclude callback
312 */
313void
314GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
315 GNUNET_CONSENSUS_ConcludeCallback conclude,
316 void *conclude_cls)
317{
318 struct GNUNET_MQ_Envelope *ev;
319
320 GNUNET_assert (NULL != conclude);
321 GNUNET_assert (NULL == consensus->conclude_cb);
322
323 consensus->conclude_cls = conclude_cls;
324 consensus->conclude_cb = conclude;
325
326 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
327 GNUNET_MQ_send (consensus->mq, ev);
328}
329
330
331/**
332 * Destroy a consensus handle (free all state associated with
333 * it, no longer call any of the callbacks).
334 *
335 * @param consensus handle to destroy
336 */
337void
338GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
339{
340 if (NULL != consensus->mq)
341 {
342 GNUNET_MQ_destroy (consensus->mq);
343 consensus->mq = NULL;
344 }
345 GNUNET_free (consensus);
346}
347
348
349/* end of consensus_api.c */
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
deleted file mode 100644
index 0afd56b27..000000000
--- a/src/consensus/consensus_protocol.h
+++ /dev/null
@@ -1,137 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21
22/**
23 * @file consensus/consensus_protocol.h
24 * @brief p2p message definitions for consensus
25 * @author Florian Dold
26 */
27
28#ifndef GNUNET_CONSENSUS_PROTOCOL_H
29#define GNUNET_CONSENSUS_PROTOCOL_H
30
31#include "platform.h"
32#include "gnunet_util_lib.h"
33#include "gnunet_common.h"
34#include "gnunet_protocols.h"
35
36
37GNUNET_NETWORK_STRUCT_BEGIN
38
39/**
40 * Sent as context message for set reconciliation.
41 *
42 * Essentially contains all the fields
43 * from 'struct TaskKey', but in NBO.
44 */
45struct GNUNET_CONSENSUS_RoundContextMessage
46{
47 /**
48 * Type: #GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
49 */
50 struct GNUNET_MessageHeader header;
51
52 /**
53 * A value from 'enum PhaseKind'.
54 */
55 uint16_t kind GNUNET_PACKED;
56
57 /**
58 * Number of the first peer
59 * in canonical order.
60 */
61 int16_t peer1 GNUNET_PACKED;
62
63 /**
64 * Number of the second peer in canonical order.
65 */
66 int16_t peer2 GNUNET_PACKED;
67
68 /**
69 * Repetition of the gradecast phase.
70 */
71 int16_t repetition GNUNET_PACKED;
72
73 /**
74 * Leader in the gradecast phase.
75 *
76 * Can be different from both peer1 and peer2.
77 */
78 int16_t leader GNUNET_PACKED;
79
80 /**
81 * Non-zero if this set reconciliation
82 * had elements removed because they were contested.
83 *
84 * Will be considered when grading broadcasts.
85 *
86 * Ignored for set operations that are not within gradecasts.
87 */
88 uint16_t is_contested GNUNET_PACKED;
89};
90
91
92enum
93{
94 CONSENSUS_MARKER_CONTESTED = 1,
95 CONSENSUS_MARKER_SIZE = 2,
96};
97
98
99/**
100 * Consensus element, either marker or payload.
101 */
102struct ConsensusElement
103{
104 /**
105 * Payload element_type, only valid
106 * if this is not a marker element.
107 */
108 uint16_t payload_type GNUNET_PACKED;
109
110 /**
111 * Is this a marker element?
112 */
113 uint8_t marker;
114
115 /* rest: element data */
116};
117
118
119struct ConsensusSizeElement
120{
121 struct ConsensusElement ce;
122
123 uint64_t size GNUNET_PACKED;
124 uint8_t sender_index;
125};
126
127
128struct ConsensusStuffedElement
129{
130 struct ConsensusElement ce;
131 struct GNUNET_HashCode rand GNUNET_PACKED;
132};
133
134
135GNUNET_NETWORK_STRUCT_END
136
137#endif
diff --git a/src/consensus/gnunet-consensus-profiler.c b/src/consensus/gnunet-consensus-profiler.c
deleted file mode 100644
index 07a536a2d..000000000
--- a/src/consensus/gnunet-consensus-profiler.c
+++ /dev/null
@@ -1,586 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file consensus/gnunet-consensus-profiler.c
23 * @brief profiling tool for gnunet-consensus
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_time_lib.h"
29#include "gnunet_consensus_service.h"
30#include "gnunet_testbed_service.h"
31
32static unsigned int num_peers = 2;
33
34static unsigned int replication = 1;
35
36static unsigned int num_values = 5;
37
38static struct GNUNET_TIME_Relative conclude_timeout;
39
40static struct GNUNET_TIME_Relative consensus_delay;
41
42static struct GNUNET_CONSENSUS_Handle **consensus_handles;
43
44static struct GNUNET_TESTBED_Operation **testbed_operations;
45
46static unsigned int num_connected_handles;
47
48static struct GNUNET_TESTBED_Peer **peers;
49
50static struct GNUNET_PeerIdentity *peer_ids;
51
52static unsigned int num_retrieved_peer_ids;
53
54static struct GNUNET_HashCode session_id;
55
56static unsigned int peers_done = 0;
57
58static int dist_static;
59
60static unsigned *results_for_peer;
61
62/**
63 * The profiler will write statistics
64 * for all peers to the file with this name.
65 */
66static char *statistics_filename;
67
68/**
69 * The profiler will write statistics
70 * for all peers to this file.
71 */
72static FILE *statistics_file;
73
74static int verbose;
75
76/**
77 * Start time for all consensuses.
78 */
79static struct GNUNET_TIME_Absolute start;
80
81/**
82 * Deadline for all consensuses.
83 */
84static struct GNUNET_TIME_Absolute deadline;
85
86
87/**
88 * Signature of the event handler function called by the
89 * respective event controller.
90 *
91 * @param cls closure
92 * @param event information about the event
93 */
94static void
95controller_cb (void *cls,
96 const struct GNUNET_TESTBED_EventInformation *event)
97{
98 GNUNET_assert (0);
99}
100
101
102static void
103statistics_done_cb (void *cls,
104 struct
105 GNUNET_TESTBED_Operation
106 *op,
107 const char *emsg)
108{
109 GNUNET_assert (NULL == emsg);
110 GNUNET_TESTBED_operation_done (op);
111 if (NULL != statistics_file)
112 fclose (statistics_file);
113 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got statistics, shutting down\n");
114 GNUNET_SCHEDULER_shutdown ();
115}
116
117
118/**
119 * Callback function to process statistic values from all peers.
120 *
121 * @param cls closure
122 * @param peer the peer the statistic belong to
123 * @param subsystem name of subsystem that created the statistic
124 * @param name the name of the datum
125 * @param value the current value
126 * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
127 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
128 */
129static int
130statistics_cb (void *cls,
131 const struct GNUNET_TESTBED_Peer *peer,
132 const char *subsystem,
133 const char *name,
134 uint64_t value,
135 int is_persistent)
136{
137 if (NULL != statistics_file)
138 {
139 fprintf (statistics_file, "P%u\t%s\t%s\t%lu\n", GNUNET_TESTBED_get_index (
140 peer), subsystem, name, (unsigned long) value);
141 }
142 return GNUNET_OK;
143}
144
145
146static void
147destroy (void *cls)
148{
149 struct GNUNET_CONSENSUS_Handle *consensus = cls;
150
151 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
152 "destroying consensus\n");
153 GNUNET_CONSENSUS_destroy (consensus);
154 peers_done++;
155 if (peers_done == num_peers)
156 {
157 unsigned int i;
158 for (i = 0; i < num_peers; i++)
159 GNUNET_TESTBED_operation_done (testbed_operations[i]);
160 for (i = 0; i < num_peers; i++)
161 printf ("P%u got %u of %u elements\n",
162 i,
163 results_for_peer[i],
164 num_values);
165 if (NULL != statistics_filename)
166 statistics_file = fopen (statistics_filename, "w");
167 GNUNET_TESTBED_get_statistics (num_peers, peers, NULL, NULL,
168 statistics_cb,
169 statistics_done_cb,
170 NULL);
171 }
172}
173
174
175/**
176 * Called when a conclusion was successful.
177 *
178 * @param cls closure, the consensus handle
179 * @return #GNUNET_YES if more consensus groups should be offered,
180 * #GNUNET_NO if not
181 */
182static void
183conclude_cb (void *cls)
184{
185 struct GNUNET_CONSENSUS_Handle **chp = cls;
186
187 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
188 "consensus %d done\n",
189 (int) (chp - consensus_handles));
190 GNUNET_SCHEDULER_add_now (destroy, *chp);
191}
192
193
194static void
195generate_indices (int *indices)
196{
197 int j;
198
199 j = 0;
200 while (j < replication)
201 {
202 int n;
203 int k;
204 int repeat;
205 n = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
206 repeat = GNUNET_NO;
207 for (k = 0; k < j; k++)
208 if (indices[k] == n)
209 {
210 repeat = GNUNET_YES;
211 break;
212 }
213 if (GNUNET_NO == repeat)
214 indices[j++] = n;
215 }
216}
217
218
219static void
220do_consensus ()
221{
222 int unique_indices[replication];
223 unsigned int i;
224 unsigned int j;
225 struct GNUNET_HashCode val;
226 struct GNUNET_SET_Element element;
227
228 if (dist_static)
229 {
230 for (i = 0; i < num_values; i++)
231 {
232 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &val);
233
234 element.data = &val;
235 element.size = sizeof(val);
236 for (j = 0; j < replication; j++)
237 {
238 GNUNET_CONSENSUS_insert (consensus_handles[j],
239 &element,
240 NULL, NULL);
241 }
242 }
243 }
244 else
245 {
246 for (i = 0; i < num_values; i++)
247 {
248 generate_indices (unique_indices);
249 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &val);
250
251 element.data = &val;
252 element.size = sizeof(val);
253 for (j = 0; j < replication; j++)
254 {
255 int cid;
256
257 cid = unique_indices[j];
258 GNUNET_CONSENSUS_insert (consensus_handles[cid],
259 &element,
260 NULL, NULL);
261 }
262 }
263 }
264
265 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
266 "all elements inserted, calling conclude\n");
267
268 for (i = 0; i < num_peers; i++)
269 GNUNET_CONSENSUS_conclude (consensus_handles[i],
270 conclude_cb, &consensus_handles[i]);
271}
272
273
274/**
275 * Callback to be called when a service connect operation is completed
276 *
277 * @param cls the callback closure from functions generating an operation
278 * @param op the operation that has been finished
279 * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
280 * @param emsg error message in case the operation has failed; will be NULL if
281 * operation has executed successfully.
282 */
283static void
284connect_complete (void *cls,
285 struct GNUNET_TESTBED_Operation *op,
286 void *ca_result,
287 const char *emsg)
288{
289 if (NULL != emsg)
290 {
291 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
292 "testbed connect emsg: %s\n",
293 emsg);
294 GNUNET_assert (0);
295 }
296
297 num_connected_handles++;
298
299 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
300 "connect complete\n");
301
302 if (num_connected_handles == num_peers)
303 {
304 do_consensus ();
305 }
306}
307
308
309static void
310new_element_cb (void *cls,
311 const struct GNUNET_SET_Element *element)
312{
313 struct GNUNET_CONSENSUS_Handle **chp = cls;
314 int idx = chp - consensus_handles;
315
316 GNUNET_assert (NULL != cls);
317
318 results_for_peer[idx]++;
319
320 GNUNET_assert (sizeof(struct GNUNET_HashCode) == element->size);
321
322 if (GNUNET_YES == verbose)
323 {
324 printf ("P%d received %s\n",
325 idx,
326 GNUNET_h2s ((struct GNUNET_HashCode *) element->data));
327 }
328}
329
330
331/**
332 * Adapter function called to establish a connection to
333 * a service.
334 *
335 * @param cls closure
336 * @param cfg configuration of the peer to connect to; will be available until
337 * GNUNET_TESTBED_operation_done() is called on the operation returned
338 * from GNUNET_TESTBED_service_connect()
339 * @return service handle to return in 'op_result', NULL on error
340 */
341static void *
342connect_adapter (void *cls,
343 const struct GNUNET_CONFIGURATION_Handle *cfg)
344{
345 struct GNUNET_CONSENSUS_Handle **chp = cls;
346 struct GNUNET_CONSENSUS_Handle *consensus;
347
348 chp = (struct GNUNET_CONSENSUS_Handle **) cls;
349
350 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
351 "connect adapter, %d peers\n",
352 num_peers);
353 consensus = GNUNET_CONSENSUS_create (cfg,
354 num_peers, peer_ids,
355 &session_id,
356 start,
357 deadline,
358 &new_element_cb, chp);
359 *chp = (struct GNUNET_CONSENSUS_Handle *) consensus;
360 return consensus;
361}
362
363
364/**
365 * Adapter function called to destroy a connection to
366 * a service.
367 *
368 * @param cls closure
369 * @param op_result service handle returned from the connect adapter
370 */
371static void
372disconnect_adapter (void *cls, void *op_result)
373{
374 /* FIXME: what to do here? */
375 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
376 "disconnect adapter called\n");
377}
378
379
380/**
381 * Callback to be called when the requested peer information is available
382 *
383 * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
384 * @param op the operation this callback corresponds to
385 * @param pinfo the result; will be NULL if the operation has failed
386 * @param emsg error message if the operation has failed; will be NULL if the
387 * operation is successful
388 */
389static void
390peer_info_cb (void *cb_cls,
391 struct GNUNET_TESTBED_Operation *op,
392 const struct GNUNET_TESTBED_PeerInformation *pinfo,
393 const char *emsg)
394{
395 struct GNUNET_PeerIdentity *p;
396 int i;
397
398 GNUNET_assert (NULL == emsg);
399
400 p = (struct GNUNET_PeerIdentity *) cb_cls;
401
402 if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY)
403 {
404 *p = *pinfo->result.id;
405 num_retrieved_peer_ids++;
406 if (num_retrieved_peer_ids == num_peers)
407 for (i = 0; i < num_peers; i++)
408 testbed_operations[i] =
409 GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus",
410 connect_complete, NULL,
411 connect_adapter, disconnect_adapter,
412 &consensus_handles[i]);
413 }
414 else
415 {
416 GNUNET_assert (0);
417 }
418
419 GNUNET_TESTBED_operation_done (op);
420}
421
422
423/**
424 * Signature of a main function for a testcase.
425 *
426 * @param cls closure
427 * @param h the run handle
428 * @param num_peers number of peers in 'peers'
429 * @param started_peers handle to peers run in the testbed. NULL upon timeout (see
430 * GNUNET_TESTBED_test_run()).
431 * @param links_succeeded the number of overlay link connection attempts that
432 * succeeded
433 * @param links_failed the number of overlay link connection attempts that
434 * failed
435 */
436static void
437test_master (void *cls,
438 struct GNUNET_TESTBED_RunHandle *h,
439 unsigned int num_peers,
440 struct GNUNET_TESTBED_Peer **started_peers,
441 unsigned int links_succeeded,
442 unsigned int links_failed)
443{
444 int i;
445
446 GNUNET_log_setup ("gnunet-consensus", "INFO", NULL);
447
448 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
449
450 peers = started_peers;
451
452 peer_ids = GNUNET_malloc (num_peers * sizeof(struct GNUNET_PeerIdentity));
453
454 results_for_peer = GNUNET_malloc (num_peers * sizeof(unsigned int));
455 consensus_handles = GNUNET_malloc (num_peers * sizeof(struct
456 ConsensusHandle *));
457 testbed_operations = GNUNET_malloc (num_peers * sizeof(struct
458 ConsensusHandle *));
459
460 for (i = 0; i < num_peers; i++)
461 GNUNET_TESTBED_peer_get_information (peers[i],
462 GNUNET_TESTBED_PIT_IDENTITY,
463 peer_info_cb,
464 &peer_ids[i]);
465}
466
467
468static void
469run (void *cls, char *const *args, const char *cfgfile,
470 const struct GNUNET_CONFIGURATION_Handle *cfg)
471{
472 static char *session_str = "gnunet-consensus/test";
473 char *topology;
474 int topology_cmp_result;
475
476 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "testbed",
477 "OVERLAY_TOPOLOGY",
478 &topology))
479 {
480 fprintf (stderr,
481 "'OVERLAY_TOPOLOGY' not found in 'testbed' config section, "
482 "seems like you passed the wrong configuration file\n");
483 return;
484 }
485
486 topology_cmp_result = strcasecmp (topology, "NONE");
487 GNUNET_free (topology);
488
489 if (0 == topology_cmp_result)
490 {
491 fprintf (stderr,
492 "'OVERLAY_TOPOLOGY' set to 'NONE', "
493 "seems like you passed the wrong configuration file\n");
494 return;
495 }
496
497 if (num_peers < replication)
498 {
499 fprintf (stderr, "k must be <=n\n");
500 return;
501 }
502
503 start = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
504 consensus_delay);
505 deadline = GNUNET_TIME_absolute_add (start, conclude_timeout);
506
507 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
508 "running gnunet-consensus\n");
509
510 GNUNET_CRYPTO_hash (session_str, strlen (session_str), &session_id);
511
512 (void) GNUNET_TESTBED_test_run ("gnunet-consensus",
513 cfgfile,
514 num_peers,
515 0,
516 controller_cb,
517 NULL,
518 test_master,
519 NULL);
520}
521
522
523int
524main (int argc, char **argv)
525{
526 struct GNUNET_GETOPT_CommandLineOption options[] = {
527 GNUNET_GETOPT_option_uint ('n',
528 "num-peers",
529 NULL,
530 gettext_noop ("number of peers in consensus"),
531 &num_peers),
532
533 GNUNET_GETOPT_option_uint ('k',
534 "value-replication",
535 NULL,
536 gettext_noop (
537 "how many peers (random selection without replacement) receive one value?"),
538 &replication),
539
540 GNUNET_GETOPT_option_uint ('x',
541 "num-values",
542 NULL,
543 gettext_noop ("number of values"),
544 &num_values),
545
546 GNUNET_GETOPT_option_relative_time ('t',
547 "timeout",
548 NULL,
549 gettext_noop ("consensus timeout"),
550 &conclude_timeout),
551
552
553 GNUNET_GETOPT_option_relative_time ('d',
554 "delay",
555 NULL,
556 gettext_noop (
557 "delay until consensus starts"),
558 &consensus_delay),
559
560 GNUNET_GETOPT_option_filename ('s',
561 "statistics",
562 "FILENAME",
563 gettext_noop ("write statistics to file"),
564 &statistics_filename),
565
566 GNUNET_GETOPT_option_flag ('S',
567 "dist-static",
568 gettext_noop (
569 "distribute elements to a static subset of good peers"),
570 &dist_static),
571
572 GNUNET_GETOPT_option_flag ('V',
573 "verbose",
574 gettext_noop (
575 "be more verbose (print received values)"),
576 &verbose),
577
578 GNUNET_GETOPT_OPTION_END
579 };
580
581 conclude_timeout = GNUNET_TIME_UNIT_SECONDS;
582 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-profiler",
583 "help",
584 options, &run, NULL, GNUNET_YES);
585 return 0;
586}
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
deleted file mode 100644
index 5b6b9bbd7..000000000
--- a/src/consensus/gnunet-service-consensus.c
+++ /dev/null
@@ -1,3572 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file consensus/gnunet-service-consensus.c
22 * @brief multi-peer set reconciliation
23 * @author Florian Dold <flo@dold.me>
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_block_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_applications.h"
30#include "gnunet_set_service.h"
31#include "gnunet_statistics_service.h"
32#include "gnunet_consensus_service.h"
33#include "consensus_protocol.h"
34#include "consensus.h"
35
36
37enum ReferendumVote
38{
39 /**
40 * Vote that nothing should change.
41 * This option is never voted explicitly.
42 */
43 VOTE_STAY = 0,
44 /**
45 * Vote that an element should be added.
46 */
47 VOTE_ADD = 1,
48 /**
49 * Vote that an element should be removed.
50 */
51 VOTE_REMOVE = 2,
52};
53
54
55enum EarlyStoppingPhase
56{
57 EARLY_STOPPING_NONE = 0,
58 EARLY_STOPPING_ONE_MORE = 1,
59 EARLY_STOPPING_DONE = 2,
60};
61
62
63enum PhaseKind
64{
65 PHASE_KIND_ALL_TO_ALL,
66 PHASE_KIND_ALL_TO_ALL_2,
67 PHASE_KIND_GRADECAST_LEADER,
68 PHASE_KIND_GRADECAST_ECHO,
69 PHASE_KIND_GRADECAST_ECHO_GRADE,
70 PHASE_KIND_GRADECAST_CONFIRM,
71 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
72 /**
73 * Apply a repetition of the all-to-all
74 * gradecast to the current set.
75 */
76 PHASE_KIND_APPLY_REP,
77 PHASE_KIND_FINISH,
78};
79
80
81enum SetKind
82{
83 SET_KIND_NONE = 0,
84 SET_KIND_CURRENT,
85 /**
86 * Last result set from a gradecast
87 */
88 SET_KIND_LAST_GRADECAST,
89 SET_KIND_LEADER_PROPOSAL,
90 SET_KIND_ECHO_RESULT,
91};
92
93enum DiffKind
94{
95 DIFF_KIND_NONE = 0,
96 DIFF_KIND_LEADER_PROPOSAL,
97 DIFF_KIND_LEADER_CONSENSUS,
98 DIFF_KIND_GRADECAST_RESULT,
99};
100
101enum RfnKind
102{
103 RFN_KIND_NONE = 0,
104 RFN_KIND_ECHO,
105 RFN_KIND_CONFIRM,
106 RFN_KIND_GRADECAST_RESULT
107};
108
109
110GNUNET_NETWORK_STRUCT_BEGIN
111
112/**
113 * Tuple of integers that together identify a task uniquely.
114 */
115struct TaskKey
116{
117 /**
118 * A value from 'enum PhaseKind'.
119 */
120 uint16_t kind GNUNET_PACKED;
121
122 /**
123 * Number of the first peer
124 * in canonical order.
125 */
126 int16_t peer1 GNUNET_PACKED;
127
128 /**
129 * Number of the second peer in canonical order.
130 */
131 int16_t peer2 GNUNET_PACKED;
132
133 /**
134 * Repetition of the gradecast phase.
135 */
136 int16_t repetition GNUNET_PACKED;
137
138 /**
139 * Leader in the gradecast phase.
140 *
141 * Can be different from both peer1 and peer2.
142 */
143 int16_t leader GNUNET_PACKED;
144};
145
146
147struct SetKey
148{
149 enum SetKind set_kind GNUNET_PACKED;
150 /**
151 * Repetition counter.
152 */
153 int k1 GNUNET_PACKED;
154 /**
155 * Leader (or 0).
156 */
157 int k2 GNUNET_PACKED;
158};
159
160
161struct SetEntry
162{
163 struct SetKey key;
164 struct GNUNET_SET_Handle *h;
165
166 /**
167 * #GNUNET_YES if the set resulted from applying a referendum with contested
168 * elements.
169 */
170 int is_contested;
171};
172
173
174struct DiffKey
175{
176 enum DiffKind diff_kind GNUNET_PACKED;
177
178 int k1 GNUNET_PACKED;
179
180 int k2 GNUNET_PACKED;
181};
182
183struct RfnKey
184{
185 enum RfnKind rfn_kind GNUNET_PACKED;
186 int k1 GNUNET_PACKED;
187 int k2 GNUNET_PACKED;
188};
189
190
191GNUNET_NETWORK_STRUCT_END
192
193
194struct SetOpCls
195{
196 struct SetKey input_set;
197
198 struct SetKey output_set;
199 struct RfnKey output_rfn;
200 struct DiffKey output_diff;
201
202 int do_not_remove;
203
204 int transceive_contested;
205
206 struct GNUNET_SET_OperationHandle *op;
207};
208
209
210struct FinishCls
211{
212 struct SetKey input_set;
213};
214
215/**
216 * Closure for both @a start_task
217 * and @a cancel_task.
218 */
219union TaskFuncCls
220{
221 struct SetOpCls setop;
222 struct FinishCls finish;
223};
224
225
226struct TaskEntry;
227
228
229typedef void
230(*TaskFunc) (struct TaskEntry *task);
231
232
233/*
234 * Node in the consensus task graph.
235 */
236struct TaskEntry
237{
238 struct TaskKey key;
239
240 struct Step *step;
241
242 int is_started;
243
244 int is_finished;
245
246 TaskFunc start;
247 TaskFunc cancel;
248
249 union TaskFuncCls cls;
250};
251
252
253struct Step
254{
255 /**
256 * All steps of one session are in a
257 * linked list for easier deallocation.
258 */
259 struct Step *prev;
260
261 /**
262 * All steps of one session are in a
263 * linked list for easier deallocation.
264 */
265 struct Step *next;
266
267 struct ConsensusSession *session;
268
269 /**
270 * Tasks that this step is composed of.
271 */
272 struct TaskEntry **tasks;
273 unsigned int tasks_len;
274 unsigned int tasks_cap;
275
276 unsigned int finished_tasks;
277
278 /*
279 * Tasks that have this task as dependency.
280 *
281 * We store pointers to subordinates rather
282 * than to prerequisites since it makes
283 * tracking the readiness of a task easier.
284 */
285 struct Step **subordinates;
286 unsigned int subordinates_len;
287 unsigned int subordinates_cap;
288
289 /**
290 * Counter for the prerequisites of this step.
291 */
292 size_t pending_prereq;
293
294 /**
295 * Task that will run this step despite any pending prerequisites.
296 */
297 struct GNUNET_SCHEDULER_Task *timeout_task;
298
299 unsigned int is_running;
300
301 unsigned int is_finished;
302
303 /**
304 * Synchrony round of the task. Determines the deadline for the task.
305 */
306 unsigned int round;
307
308 /**
309 * Human-readable name for the task, used for debugging.
310 */
311 char *debug_name;
312
313 /**
314 * When we're doing an early finish, how should this step be treated? If
315 * #GNUNET_YES, the step will be marked as finished without actually running
316 * its tasks. Otherwise, the step will still be run even after an early
317 * finish.
318 *
319 * Note that a task may never be finished early if it is already running.
320 */
321 int early_finishable;
322};
323
324
325struct RfnElementInfo
326{
327 const struct GNUNET_SET_Element *element;
328
329 /**
330 * #GNUNET_YES if the peer votes for the proposal.
331 */
332 int *votes;
333
334 /**
335 * Proposal for this element, can only be #VOTE_ADD or #VOTE_REMOVE.
336 */
337 enum ReferendumVote proposal;
338};
339
340
341struct ReferendumEntry
342{
343 struct RfnKey key;
344
345 /*
346 * Elements where there is at least one proposed change.
347 *
348 * Maps the hash of the GNUNET_SET_Element
349 * to 'struct RfnElementInfo'.
350 */
351 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
352
353 unsigned int num_peers;
354
355 /**
356 * Stores, for every peer in the session,
357 * whether the peer finished the whole referendum.
358 *
359 * Votes from peers are only counted if they're
360 * marked as committed (#GNUNET_YES) in the referendum.
361 *
362 * Otherwise (#GNUNET_NO), the requested changes are
363 * not counted for majority votes or thresholds.
364 */
365 int *peer_commited;
366
367
368 /**
369 * Contestation state of the peer. If a peer is contested, the values it
370 * contributed are still counted for applying changes, but the grading is
371 * affected.
372 */
373 int *peer_contested;
374};
375
376
377struct DiffElementInfo
378{
379 const struct GNUNET_SET_Element *element;
380
381 /**
382 * Positive weight for 'add', negative
383 * weights for 'remove'.
384 */
385 int weight;
386};
387
388
389/**
390 * Weighted diff.
391 */
392struct DiffEntry
393{
394 struct DiffKey key;
395 struct GNUNET_CONTAINER_MultiHashMap *changes;
396};
397
398struct SetHandle
399{
400 struct SetHandle *prev;
401 struct SetHandle *next;
402
403 struct GNUNET_SET_Handle *h;
404};
405
406
407/**
408 * A consensus session consists of one local client and the remote authorities.
409 */
410struct ConsensusSession
411{
412 /**
413 * Consensus sessions are kept in a DLL.
414 */
415 struct ConsensusSession *next;
416
417 /**
418 * Consensus sessions are kept in a DLL.
419 */
420 struct ConsensusSession *prev;
421
422 unsigned int num_client_insert_pending;
423
424 struct GNUNET_CONTAINER_MultiHashMap *setmap;
425 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
426 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
427
428 /**
429 * Array of peers with length 'num_peers'.
430 */
431 int *peers_blacklisted;
432
433 /*
434 * Mapping from (hashed) TaskKey to TaskEntry.
435 *
436 * We map the application_id for a round to the task that should be
437 * executed, so we don't have to go through all task whenever we get
438 * an incoming set op request.
439 */
440 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
441
442 struct Step *steps_head;
443 struct Step *steps_tail;
444
445 int conclude_started;
446
447 int conclude_done;
448
449 /**
450 * Global consensus identification, computed
451 * from the session id and participating authorities.
452 */
453 struct GNUNET_HashCode global_id;
454
455 /**
456 * Client that inhabits the session
457 */
458 struct GNUNET_SERVICE_Client *client;
459
460 /**
461 * Queued messages to the client.
462 */
463 struct GNUNET_MQ_Handle *client_mq;
464
465 /**
466 * Time when the conclusion of the consensus should begin.
467 */
468 struct GNUNET_TIME_Absolute conclude_start;
469
470 /**
471 * Timeout for all rounds together, single rounds will schedule a timeout task
472 * with a fraction of the conclude timeout.
473 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
474 */
475 struct GNUNET_TIME_Absolute conclude_deadline;
476
477 struct GNUNET_PeerIdentity *peers;
478
479 /**
480 * Number of other peers in the consensus.
481 */
482 unsigned int num_peers;
483
484 /**
485 * Index of the local peer in the peers array
486 */
487 unsigned int local_peer_idx;
488
489 /**
490 * Listener for requests from other peers.
491 * Uses the session's global id as app id.
492 */
493 struct GNUNET_SET_ListenHandle *set_listener;
494
495 /**
496 * State of our early stopping scheme.
497 */
498 int early_stopping;
499
500 /**
501 * Our set size from the first round.
502 */
503 uint64_t first_size;
504
505 uint64_t *first_sizes_received;
506
507 /**
508 * Bounded Eppstein lower bound.
509 */
510 uint64_t lower_bound;
511
512 struct SetHandle *set_handles_head;
513 struct SetHandle *set_handles_tail;
514};
515
516/**
517 * Linked list of sessions this peer participates in.
518 */
519static struct ConsensusSession *sessions_head;
520
521/**
522 * Linked list of sessions this peer participates in.
523 */
524static struct ConsensusSession *sessions_tail;
525
526/**
527 * Configuration of the consensus service.
528 */
529static const struct GNUNET_CONFIGURATION_Handle *cfg;
530
531/**
532 * Peer that runs this service.
533 */
534static struct GNUNET_PeerIdentity my_peer;
535
536/**
537 * Statistics handle.
538 */
539struct GNUNET_STATISTICS_Handle *statistics;
540
541
542static void
543finish_task (struct TaskEntry *task);
544
545
546static void
547run_ready_steps (struct ConsensusSession *session);
548
549
550static const char *
551phasename (uint16_t phase)
552{
553 switch (phase)
554 {
555 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
556
557 case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
558
559 case PHASE_KIND_FINISH: return "FINISH";
560
561 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
562
563 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
564
565 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
566
567 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
568
569 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
570
571 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
572
573 default: return "(unknown)";
574 }
575}
576
577
578static const char *
579setname (uint16_t kind)
580{
581 switch (kind)
582 {
583 case SET_KIND_CURRENT: return "CURRENT";
584
585 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
586
587 case SET_KIND_NONE: return "NONE";
588
589 default: return "(unknown)";
590 }
591}
592
593
594static const char *
595rfnname (uint16_t kind)
596{
597 switch (kind)
598 {
599 case RFN_KIND_NONE: return "NONE";
600
601 case RFN_KIND_ECHO: return "ECHO";
602
603 case RFN_KIND_CONFIRM: return "CONFIRM";
604
605 default: return "(unknown)";
606 }
607}
608
609
610static const char *
611diffname (uint16_t kind)
612{
613 switch (kind)
614 {
615 case DIFF_KIND_NONE: return "NONE";
616
617 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
618
619 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
620
621 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
622
623 default: return "(unknown)";
624 }
625}
626
627
628#ifdef GNUNET_EXTRA_LOGGING
629
630
631static const char *
632debug_str_element (const struct GNUNET_SET_Element *el)
633{
634 struct GNUNET_HashCode hash;
635
636 GNUNET_SET_element_hash (el, &hash);
637
638 return GNUNET_h2s (&hash);
639}
640
641
642static const char *
643debug_str_task_key (const struct TaskKey *tk)
644{
645 static char buf[256];
646
647 snprintf (buf, sizeof(buf),
648 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
649 phasename (tk->kind), tk->peer1, tk->peer2,
650 tk->leader, tk->repetition);
651
652 return buf;
653}
654
655
656static const char *
657debug_str_diff_key (const struct DiffKey *dk)
658{
659 static char buf[256];
660
661 snprintf (buf, sizeof(buf),
662 "DiffKey kind=%s, k1=%d, k2=%d",
663 diffname (dk->diff_kind), dk->k1, dk->k2);
664
665 return buf;
666}
667
668
669static const char *
670debug_str_set_key (const struct SetKey *sk)
671{
672 static char buf[256];
673
674 snprintf (buf, sizeof(buf),
675 "SetKey kind=%s, k1=%d, k2=%d",
676 setname (sk->set_kind),
677 sk->k1,
678 sk->k2);
679 return buf;
680}
681
682
683static const char *
684debug_str_rfn_key (const struct RfnKey *rk)
685{
686 static char buf[256];
687
688 snprintf (buf, sizeof(buf),
689 "RfnKey kind=%s, k1=%d, k2=%d",
690 rfnname (rk->rfn_kind),
691 rk->k1,
692 rk->k2);
693 return buf;
694}
695
696
697#endif /* GNUNET_EXTRA_LOGGING */
698
699
700/**
701 * Send the final result set of the consensus to the client, element by
702 * element.
703 *
704 * @param cls closure
705 * @param element the current element, NULL if all elements have been
706 * iterated over
707 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
708 */
709static int
710send_to_client_iter (void *cls,
711 const struct GNUNET_SET_Element *element)
712{
713 struct TaskEntry *task = (struct TaskEntry *) cls;
714 struct ConsensusSession *session = task->step->session;
715 struct GNUNET_MQ_Envelope *ev;
716
717 if (NULL != element)
718 {
719 struct GNUNET_CONSENSUS_ElementMessage *m;
720 const struct ConsensusElement *ce;
721
722 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT ==
723 element->element_type);
724 ce = element->data;
725
726 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n",
727 (unsigned) ce->marker);
728
729 if (0 != ce->marker)
730 return GNUNET_YES;
731
732 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
733 "P%d: sending element %s to client\n",
734 session->local_peer_idx,
735 debug_str_element (element));
736
737 ev = GNUNET_MQ_msg_extra (m,
738 element->size - sizeof(struct ConsensusElement),
739 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
740 m->element_type = ce->payload_type;
741 GNUNET_memcpy (&m[1],
742 &ce[1],
743 element->size - sizeof(struct ConsensusElement));
744 GNUNET_MQ_send (session->client_mq,
745 ev);
746 }
747 else
748 {
749 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750 "P%d: finished iterating elements for client\n",
751 session->local_peer_idx);
752 ev = GNUNET_MQ_msg_header (
753 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
754 GNUNET_MQ_send (session->client_mq,
755 ev);
756 }
757 return GNUNET_YES;
758}
759
760
761static struct SetEntry *
762lookup_set (struct ConsensusSession *session,
763 const struct SetKey *key)
764{
765 struct GNUNET_HashCode hash;
766
767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
768 "P%u: looking up set {%s}\n",
769 session->local_peer_idx,
770 debug_str_set_key (key));
771
772 GNUNET_assert (SET_KIND_NONE != key->set_kind);
773 GNUNET_CRYPTO_hash (key,
774 sizeof(struct SetKey),
775 &hash);
776 return GNUNET_CONTAINER_multihashmap_get (session->setmap,
777 &hash);
778}
779
780
781static struct DiffEntry *
782lookup_diff (struct ConsensusSession *session,
783 const struct DiffKey *key)
784{
785 struct GNUNET_HashCode hash;
786
787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788 "P%u: looking up diff {%s}\n",
789 session->local_peer_idx,
790 debug_str_diff_key (key));
791 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
792 GNUNET_CRYPTO_hash (key,
793 sizeof(struct DiffKey),
794 &hash);
795 return GNUNET_CONTAINER_multihashmap_get (session->diffmap,
796 &hash);
797}
798
799
800static struct ReferendumEntry *
801lookup_rfn (struct ConsensusSession *session,
802 const struct RfnKey *key)
803{
804 struct GNUNET_HashCode hash;
805
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807 "P%u: looking up rfn {%s}\n",
808 session->local_peer_idx,
809 debug_str_rfn_key (key));
810 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
811 GNUNET_CRYPTO_hash (key,
812 sizeof(struct RfnKey),
813 &hash);
814 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap,
815 &hash);
816}
817
818
819static void
820diff_insert (struct DiffEntry *diff,
821 int weight,
822 const struct GNUNET_SET_Element *element)
823{
824 struct DiffElementInfo *di;
825 struct GNUNET_HashCode hash;
826
827 GNUNET_assert ((1 == weight) || (-1 == weight));
828
829 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
830 "diff_insert with element size %u\n",
831 element->size);
832
833 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834 "hashing element\n");
835
836 GNUNET_SET_element_hash (element, &hash);
837
838 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839 "hashed element\n");
840
841 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
842
843 if (NULL == di)
844 {
845 di = GNUNET_new (struct DiffElementInfo);
846 di->element = GNUNET_SET_element_dup (element);
847 GNUNET_assert (GNUNET_OK ==
848 GNUNET_CONTAINER_multihashmap_put (diff->changes,
849 &hash,
850 di,
851 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
852 }
853
854 di->weight = weight;
855}
856
857
858static void
859rfn_commit (struct ReferendumEntry *rfn,
860 uint16_t commit_peer)
861{
862 GNUNET_assert (commit_peer < rfn->num_peers);
863
864 rfn->peer_commited[commit_peer] = GNUNET_YES;
865}
866
867
868static void
869rfn_contest (struct ReferendumEntry *rfn,
870 uint16_t contested_peer)
871{
872 GNUNET_assert (contested_peer < rfn->num_peers);
873
874 rfn->peer_contested[contested_peer] = GNUNET_YES;
875}
876
877
878static uint16_t
879rfn_noncontested (struct ReferendumEntry *rfn)
880{
881 uint16_t ret;
882
883 ret = 0;
884 for (uint16_t i = 0; i < rfn->num_peers; i++)
885 if ((GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO ==
886 rfn->peer_contested[i]))
887 ret++;
888
889 return ret;
890}
891
892
893static void
894rfn_vote (struct ReferendumEntry *rfn,
895 uint16_t voting_peer,
896 enum ReferendumVote vote,
897 const struct GNUNET_SET_Element *element)
898{
899 struct RfnElementInfo *ri;
900 struct GNUNET_HashCode hash;
901
902 GNUNET_assert (voting_peer < rfn->num_peers);
903
904 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
905 since VOTE_KEEP is implicit in not voting. */
906 GNUNET_assert ((VOTE_ADD == vote) || (VOTE_REMOVE == vote));
907
908 GNUNET_SET_element_hash (element, &hash);
909 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
910
911 if (NULL == ri)
912 {
913 ri = GNUNET_new (struct RfnElementInfo);
914 ri->element = GNUNET_SET_element_dup (element);
915 ri->votes = GNUNET_new_array (rfn->num_peers, int);
916 GNUNET_assert (GNUNET_OK ==
917 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
918 &hash, ri,
919 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
920 }
921
922 ri->votes[voting_peer] = GNUNET_YES;
923 ri->proposal = vote;
924}
925
926
927static uint16_t
928task_other_peer (struct TaskEntry *task)
929{
930 uint16_t me = task->step->session->local_peer_idx;
931
932 if (task->key.peer1 == me)
933 return task->key.peer2;
934 return task->key.peer1;
935}
936
937
938static int
939cmp_uint64_t (const void *pa, const void *pb)
940{
941 uint64_t a = *(uint64_t *) pa;
942 uint64_t b = *(uint64_t *) pb;
943
944 if (a == b)
945 return 0;
946 if (a < b)
947 return -1;
948 return 1;
949}
950
951
952/**
953 * Callback for set operation results. Called for each element
954 * in the result set.
955 *
956 * @param cls closure
957 * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
958 * @param current_size current set size
959 * @param status see enum GNUNET_SET_Status
960 */
961static void
962set_result_cb (void *cls,
963 const struct GNUNET_SET_Element *element,
964 uint64_t current_size,
965 enum GNUNET_SET_Status status)
966{
967 struct TaskEntry *task = cls;
968 struct ConsensusSession *session = task->step->session;
969 struct SetEntry *output_set = NULL;
970 struct DiffEntry *output_diff = NULL;
971 struct ReferendumEntry *output_rfn = NULL;
972 unsigned int other_idx;
973 struct SetOpCls *setop;
974 const struct ConsensusElement *consensus_element = NULL;
975
976 if (NULL != element)
977 {
978 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
979 "P%u: got element of type %u, status %u\n",
980 session->local_peer_idx,
981 (unsigned) element->element_type,
982 (unsigned) status);
983 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT ==
984 element->element_type);
985 consensus_element = element->data;
986 }
987
988 setop = &task->cls.setop;
989
990
991 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992 "P%u: got set result for {%s}, status %u\n",
993 session->local_peer_idx,
994 debug_str_task_key (&task->key),
995 status);
996
997 if (GNUNET_NO == task->is_started)
998 {
999 GNUNET_break_op (0);
1000 return;
1001 }
1002
1003 if (GNUNET_YES == task->is_finished)
1004 {
1005 GNUNET_break_op (0);
1006 return;
1007 }
1008
1009 other_idx = task_other_peer (task);
1010
1011 if (SET_KIND_NONE != setop->output_set.set_kind)
1012 {
1013 output_set = lookup_set (session,
1014 &setop->output_set);
1015 GNUNET_assert (NULL != output_set);
1016 }
1017
1018 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1019 {
1020 output_diff = lookup_diff (session, &setop->output_diff);
1021 GNUNET_assert (NULL != output_diff);
1022 }
1023
1024 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1025 {
1026 output_rfn = lookup_rfn (session, &setop->output_rfn);
1027 GNUNET_assert (NULL != output_rfn);
1028 }
1029
1030 if (GNUNET_YES == session->peers_blacklisted[other_idx])
1031 {
1032 /* Peer might have been blacklisted
1033 by a gradecast running in parallel, ignore elements from now */
1034 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
1035 return;
1036 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
1037 return;
1038 }
1039
1040 if ((NULL != consensus_element) && (0 != consensus_element->marker))
1041 {
1042 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043 "P%u: got some marker\n",
1044 session->local_peer_idx);
1045 if ((GNUNET_YES == setop->transceive_contested) &&
1046 (CONSENSUS_MARKER_CONTESTED == consensus_element->marker))
1047 {
1048 GNUNET_assert (NULL != output_rfn);
1049 rfn_contest (output_rfn, task_other_peer (task));
1050 return;
1051 }
1052
1053 if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1054 {
1055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056 "P%u: got size marker\n",
1057 session->local_peer_idx);
1058
1059
1060 struct ConsensusSizeElement *cse = (void *) consensus_element;
1061
1062 if (cse->sender_index == other_idx)
1063 {
1064 if (NULL == session->first_sizes_received)
1065 session->first_sizes_received = GNUNET_new_array (session->num_peers,
1066 uint64_t);
1067 session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1068
1069 uint64_t *copy = GNUNET_memdup (session->first_sizes_received,
1070 sizeof(uint64_t) * session->num_peers);
1071 qsort (copy, session->num_peers, sizeof(uint64_t), cmp_uint64_t);
1072 session->lower_bound = copy[session->num_peers / 3 + 1];
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074 "P%u: lower bound %llu\n",
1075 session->local_peer_idx,
1076 (long long) session->lower_bound);
1077 GNUNET_free (copy);
1078 }
1079 return;
1080 }
1081
1082 return;
1083 }
1084
1085 switch (status)
1086 {
1087 case GNUNET_SET_STATUS_ADD_LOCAL:
1088 GNUNET_assert (NULL != consensus_element);
1089 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090 "Adding element in Task {%s}\n",
1091 debug_str_task_key (&task->key));
1092 if (NULL != output_set)
1093 {
1094 // FIXME: record pending adds, use callback
1095 GNUNET_SET_add_element (output_set->h,
1096 element,
1097 NULL,
1098 NULL);
1099#ifdef GNUNET_EXTRA_LOGGING
1100 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101 "P%u: adding element %s into set {%s} of task {%s}\n",
1102 session->local_peer_idx,
1103 debug_str_element (element),
1104 debug_str_set_key (&setop->output_set),
1105 debug_str_task_key (&task->key));
1106#endif
1107 }
1108 if (NULL != output_diff)
1109 {
1110 diff_insert (output_diff, 1, element);
1111#ifdef GNUNET_EXTRA_LOGGING
1112 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113 "P%u: adding element %s into diff {%s} of task {%s}\n",
1114 session->local_peer_idx,
1115 debug_str_element (element),
1116 debug_str_diff_key (&setop->output_diff),
1117 debug_str_task_key (&task->key));
1118#endif
1119 }
1120 if (NULL != output_rfn)
1121 {
1122 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1123#ifdef GNUNET_EXTRA_LOGGING
1124 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1126 session->local_peer_idx,
1127 debug_str_element (element),
1128 debug_str_rfn_key (&setop->output_rfn),
1129 debug_str_task_key (&task->key));
1130#endif
1131 }
1132 // XXX: add result to structures in task
1133 break;
1134
1135 case GNUNET_SET_STATUS_ADD_REMOTE:
1136 GNUNET_assert (NULL != consensus_element);
1137 if (GNUNET_YES == setop->do_not_remove)
1138 break;
1139 if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1140 break;
1141 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1142 "Removing element in Task {%s}\n",
1143 debug_str_task_key (&task->key));
1144 if (NULL != output_set)
1145 {
1146 // FIXME: record pending adds, use callback
1147 GNUNET_SET_remove_element (output_set->h,
1148 element,
1149 NULL,
1150 NULL);
1151#ifdef GNUNET_EXTRA_LOGGING
1152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153 "P%u: removing element %s from set {%s} of task {%s}\n",
1154 session->local_peer_idx,
1155 debug_str_element (element),
1156 debug_str_set_key (&setop->output_set),
1157 debug_str_task_key (&task->key));
1158#endif
1159 }
1160 if (NULL != output_diff)
1161 {
1162 diff_insert (output_diff, -1, element);
1163#ifdef GNUNET_EXTRA_LOGGING
1164 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165 "P%u: removing element %s from diff {%s} of task {%s}\n",
1166 session->local_peer_idx,
1167 debug_str_element (element),
1168 debug_str_diff_key (&setop->output_diff),
1169 debug_str_task_key (&task->key));
1170#endif
1171 }
1172 if (NULL != output_rfn)
1173 {
1174 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1175#ifdef GNUNET_EXTRA_LOGGING
1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1178 session->local_peer_idx,
1179 debug_str_element (element),
1180 debug_str_rfn_key (&setop->output_rfn),
1181 debug_str_task_key (&task->key));
1182#endif
1183 }
1184 break;
1185
1186 case GNUNET_SET_STATUS_DONE:
1187 // XXX: check first if any changes to the underlying
1188 // set are still pending
1189 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190 "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1191 session->local_peer_idx,
1192 debug_str_task_key (&task->key),
1193 (unsigned int) task->step->finished_tasks,
1194 (unsigned int) task->step->tasks_len);
1195 if (NULL != output_rfn)
1196 {
1197 rfn_commit (output_rfn, task_other_peer (task));
1198 }
1199 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1200 {
1201 session->first_size = current_size;
1202 }
1203 finish_task (task);
1204 break;
1205
1206 case GNUNET_SET_STATUS_FAILURE:
1207 // XXX: cleanup
1208 GNUNET_break_op (0);
1209 finish_task (task);
1210 return;
1211
1212 default:
1213 /* not reached */
1214 GNUNET_assert (0);
1215 }
1216}
1217
1218
1219#ifdef EVIL
1220
1221enum EvilnessType
1222{
1223 EVILNESS_NONE,
1224 EVILNESS_CRAM_ALL,
1225 EVILNESS_CRAM_LEAD,
1226 EVILNESS_CRAM_ECHO,
1227 EVILNESS_SLACK,
1228 EVILNESS_SLACK_A2A,
1229};
1230
1231enum EvilnessSubType
1232{
1233 EVILNESS_SUB_NONE,
1234 EVILNESS_SUB_REPLACEMENT,
1235 EVILNESS_SUB_NO_REPLACEMENT,
1236};
1237
1238struct Evilness
1239{
1240 enum EvilnessType type;
1241 enum EvilnessSubType subtype;
1242 unsigned int num;
1243};
1244
1245
1246static int
1247parse_evilness_cram_subtype (const char *evil_subtype_str,
1248 struct Evilness *evil)
1249{
1250 if (0 == strcmp ("replace", evil_subtype_str))
1251 {
1252 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1253 }
1254 else if (0 == strcmp ("noreplace", evil_subtype_str))
1255 {
1256 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1257 }
1258 else
1259 {
1260 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1261 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1262 evil_subtype_str);
1263 return GNUNET_SYSERR;
1264 }
1265 return GNUNET_OK;
1266}
1267
1268
1269static void
1270get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1271{
1272 char *evil_spec;
1273 char *field;
1274 char *evil_type_str = NULL;
1275 char *evil_subtype_str = NULL;
1276
1277 GNUNET_assert (NULL != evil);
1278
1279 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus",
1280 "EVIL_SPEC",
1281 &evil_spec))
1282 {
1283 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284 "P%u: no evilness\n",
1285 session->local_peer_idx);
1286 evil->type = EVILNESS_NONE;
1287 return;
1288 }
1289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1290 "P%u: got evilness spec\n",
1291 session->local_peer_idx);
1292
1293 for (field = strtok (evil_spec, "/");
1294 NULL != field;
1295 field = strtok (NULL, "/"))
1296 {
1297 unsigned int peer_num;
1298 unsigned int evil_num;
1299 int ret;
1300
1301 evil_type_str = NULL;
1302 evil_subtype_str = NULL;
1303
1304 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str,
1305 &evil_subtype_str, &evil_num);
1306
1307 if (ret != 4)
1308 {
1309 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1310 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1311 field,
1312 ret);
1313 goto not_evil;
1314 }
1315
1316 GNUNET_assert (NULL != evil_type_str);
1317 GNUNET_assert (NULL != evil_subtype_str);
1318
1319 if (peer_num == session->local_peer_idx)
1320 {
1321 if (0 == strcmp ("slack", evil_type_str))
1322 {
1323 evil->type = EVILNESS_SLACK;
1324 }
1325 if (0 == strcmp ("slack-a2a", evil_type_str))
1326 {
1327 evil->type = EVILNESS_SLACK_A2A;
1328 }
1329 else if (0 == strcmp ("cram-all", evil_type_str))
1330 {
1331 evil->type = EVILNESS_CRAM_ALL;
1332 evil->num = evil_num;
1333 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1334 goto not_evil;
1335 }
1336 else if (0 == strcmp ("cram-lead", evil_type_str))
1337 {
1338 evil->type = EVILNESS_CRAM_LEAD;
1339 evil->num = evil_num;
1340 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1341 goto not_evil;
1342 }
1343 else if (0 == strcmp ("cram-echo", evil_type_str))
1344 {
1345 evil->type = EVILNESS_CRAM_ECHO;
1346 evil->num = evil_num;
1347 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1348 goto not_evil;
1349 }
1350 else
1351 {
1352 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1353 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1354 evil_type_str);
1355 goto not_evil;
1356 }
1357 goto cleanup;
1358 }
1359 /* No GNUNET_free since memory was allocated by libc */
1360 free (evil_type_str);
1361 evil_type_str = NULL;
1362 evil_subtype_str = NULL;
1363 }
1364not_evil:
1365 evil->type = EVILNESS_NONE;
1366cleanup:
1367 GNUNET_free (evil_spec);
1368 /* no GNUNET_free since it wasn't
1369 * allocated with GNUNET_malloc */
1370 if (NULL != evil_type_str)
1371 free (evil_type_str);
1372 if (NULL != evil_subtype_str)
1373 free (evil_subtype_str);
1374}
1375
1376
1377#endif
1378
1379
1380/**
1381 * Commit the appropriate set for a task.
1382 */
1383static void
1384commit_set (struct ConsensusSession *session,
1385 struct TaskEntry *task)
1386{
1387 struct SetEntry *set;
1388 struct SetOpCls *setop = &task->cls.setop;
1389
1390 GNUNET_assert (NULL != setop->op);
1391 set = lookup_set (session, &setop->input_set);
1392 GNUNET_assert (NULL != set);
1393
1394 if ((GNUNET_YES == setop->transceive_contested) && (GNUNET_YES ==
1395 set->is_contested))
1396 {
1397 struct GNUNET_SET_Element element;
1398 struct ConsensusElement ce = { 0 };
1399
1400 ce.marker = CONSENSUS_MARKER_CONTESTED;
1401 element.data = &ce;
1402 element.size = sizeof(struct ConsensusElement);
1403 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1404 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1405 }
1406
1407 if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1408 {
1409 struct GNUNET_SET_Element element;
1410 struct ConsensusSizeElement cse = {
1411 .size = 0,
1412 .sender_index = 0
1413 };
1414 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1415 cse.ce.marker = CONSENSUS_MARKER_SIZE;
1416 cse.size = GNUNET_htonll (session->first_size);
1417 cse.sender_index = session->local_peer_idx;
1418 element.data = &cse;
1419 element.size = sizeof(struct ConsensusSizeElement);
1420 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1421 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1422 }
1423
1424#ifdef EVIL
1425 {
1426 struct Evilness evil;
1427
1428 get_evilness (session, &evil);
1429 if (EVILNESS_NONE != evil.type)
1430 {
1431 /* Useful for evaluation */
1432 GNUNET_STATISTICS_set (statistics,
1433 "is evil",
1434 1,
1435 GNUNET_NO);
1436 }
1437 switch (evil.type)
1438 {
1439 case EVILNESS_CRAM_ALL:
1440 case EVILNESS_CRAM_LEAD:
1441 case EVILNESS_CRAM_ECHO:
1442 /* We're not cramming elements in the
1443 all-to-all round, since that would just
1444 add more elements to the result set, but
1445 wouldn't test robustness. */
1446 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1447 {
1448 GNUNET_SET_commit (setop->op, set->h);
1449 break;
1450 }
1451 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1452 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) ||
1453 (SET_KIND_CURRENT != set->key.set_kind) ))
1454 {
1455 GNUNET_SET_commit (setop->op, set->h);
1456 break;
1457 }
1458 if ((EVILNESS_CRAM_ECHO == evil.type) && (PHASE_KIND_GRADECAST_ECHO !=
1459 task->key.kind))
1460 {
1461 GNUNET_SET_commit (setop->op, set->h);
1462 break;
1463 }
1464 for (unsigned int i = 0; i < evil.num; i++)
1465 {
1466 struct GNUNET_SET_Element element;
1467 struct ConsensusStuffedElement se = {
1468 .ce.payload_type = 0,
1469 .ce.marker = 0,
1470 };
1471 element.data = &se;
1472 element.size = sizeof(struct ConsensusStuffedElement);
1473 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1474
1475 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1476 {
1477 /* Always generate a new element. */
1478 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK,
1479 &se.rand);
1480 }
1481 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1482 {
1483 /* Always cram the same elements, derived from counter. */
1484 GNUNET_CRYPTO_hash (&i, sizeof(i), &se.rand);
1485 }
1486 else
1487 {
1488 GNUNET_assert (0);
1489 }
1490 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1491#ifdef GNUNET_EXTRA_LOGGING
1492 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1494 session->local_peer_idx,
1495 debug_str_element (&element),
1496 debug_str_set_key (&setop->input_set),
1497 debug_str_task_key (&task->key));
1498#endif
1499 }
1500 GNUNET_STATISTICS_update (statistics,
1501 "# stuffed elements",
1502 evil.num,
1503 GNUNET_NO);
1504 GNUNET_SET_commit (setop->op, set->h);
1505 break;
1506
1507 case EVILNESS_SLACK:
1508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1509 "P%u: evil peer: slacking\n",
1510 (unsigned int) session->local_peer_idx);
1511
1512 /* Do nothing. */
1513 case EVILNESS_SLACK_A2A:
1514 if ((PHASE_KIND_ALL_TO_ALL_2 == task->key.kind) ||
1515 (PHASE_KIND_ALL_TO_ALL == task->key.kind))
1516 {
1517 struct GNUNET_SET_Handle *empty_set;
1518 empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1519 GNUNET_SET_commit (setop->op, empty_set);
1520 GNUNET_SET_destroy (empty_set);
1521 }
1522 else
1523 {
1524 GNUNET_SET_commit (setop->op,
1525 set->h);
1526 }
1527 break;
1528
1529 case EVILNESS_NONE:
1530 GNUNET_SET_commit (setop->op,
1531 set->h);
1532 break;
1533 }
1534 }
1535#else
1536 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1537 {
1538 GNUNET_SET_commit (setop->op, set->h);
1539 }
1540 else
1541 {
1542 /* For our testcases, we don't want the blacklisted
1543 peers to wait. */
1544 GNUNET_SET_operation_cancel (setop->op);
1545 setop->op = NULL;
1546 finish_task (task);
1547 }
1548#endif
1549}
1550
1551
1552static void
1553put_diff (struct ConsensusSession *session,
1554 struct DiffEntry *diff)
1555{
1556 struct GNUNET_HashCode hash;
1557
1558 GNUNET_CRYPTO_hash (&diff->key,
1559 sizeof(struct DiffKey),
1560 &hash);
1561 GNUNET_assert (GNUNET_OK ==
1562 GNUNET_CONTAINER_multihashmap_put (session->diffmap,
1563 &hash,
1564 diff,
1565 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1566}
1567
1568
1569static void
1570put_set (struct ConsensusSession *session,
1571 struct SetEntry *set)
1572{
1573 struct GNUNET_HashCode hash;
1574
1575 GNUNET_assert (NULL != set->h);
1576 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1577 "Putting set %s\n",
1578 debug_str_set_key (&set->key));
1579 GNUNET_CRYPTO_hash (&set->key,
1580 sizeof(struct SetKey),
1581 &hash);
1582 GNUNET_assert (GNUNET_SYSERR !=
1583 GNUNET_CONTAINER_multihashmap_put (session->setmap,
1584 &hash,
1585 set,
1586 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1587}
1588
1589
1590static void
1591put_rfn (struct ConsensusSession *session,
1592 struct ReferendumEntry *rfn)
1593{
1594 struct GNUNET_HashCode hash;
1595
1596 GNUNET_CRYPTO_hash (&rfn->key, sizeof(struct RfnKey), &hash);
1597 GNUNET_assert (GNUNET_OK ==
1598 GNUNET_CONTAINER_multihashmap_put (session->rfnmap,
1599 &hash,
1600 rfn,
1601 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1602}
1603
1604
1605static void
1606task_cancel_reconcile (struct TaskEntry *task)
1607{
1608 /* not implemented yet */
1609 GNUNET_assert (0);
1610}
1611
1612
1613static void
1614apply_diff_to_rfn (struct DiffEntry *diff,
1615 struct ReferendumEntry *rfn,
1616 uint16_t voting_peer,
1617 uint16_t num_peers)
1618{
1619 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1620 struct DiffElementInfo *di;
1621
1622 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1623
1624 while (GNUNET_YES ==
1625 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1626 NULL,
1627 (const void **) &di))
1628 {
1629 if (di->weight > 0)
1630 {
1631 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1632 }
1633 if (di->weight < 0)
1634 {
1635 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1636 }
1637 }
1638
1639 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1640}
1641
1642
1643static struct DiffEntry *
1644diff_create (void)
1645{
1646 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1647
1648 d->changes = GNUNET_CONTAINER_multihashmap_create (8,
1649 GNUNET_NO);
1650
1651 return d;
1652}
1653
1654
1655#if 0
1656static struct DiffEntry *
1657diff_compose (struct DiffEntry *diff_1,
1658 struct DiffEntry *diff_2)
1659{
1660 struct DiffEntry *diff_new;
1661 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1662 struct DiffElementInfo *di;
1663
1664 diff_new = diff_create ();
1665
1666 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1667 while (GNUNET_YES ==
1668 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1669 NULL,
1670 (const void **) &di))
1671 {
1672 diff_insert (diff_new,
1673 di->weight,
1674 di->element);
1675 }
1676 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1677
1678 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1679 while (GNUNET_YES ==
1680 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1681 NULL,
1682 (const void **) &di))
1683 {
1684 diff_insert (diff_new,
1685 di->weight,
1686 di->element);
1687 }
1688 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1689
1690 return diff_new;
1691}
1692
1693
1694#endif
1695
1696
1697struct ReferendumEntry *
1698rfn_create (uint16_t size)
1699{
1700 struct ReferendumEntry *rfn;
1701
1702 rfn = GNUNET_new (struct ReferendumEntry);
1703 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1704 rfn->peer_commited = GNUNET_new_array (size, int);
1705 rfn->peer_contested = GNUNET_new_array (size, int);
1706 rfn->num_peers = size;
1707
1708 return rfn;
1709}
1710
1711
1712#if UNUSED
1713static void
1714diff_destroy (struct DiffEntry *diff)
1715{
1716 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1717 GNUNET_free (diff);
1718}
1719
1720
1721#endif
1722
1723
1724/**
1725 * For a given majority, count what the outcome
1726 * is (add/remove/keep), and give the number
1727 * of peers that voted for this outcome.
1728 */
1729static void
1730rfn_majority (const struct ReferendumEntry *rfn,
1731 const struct RfnElementInfo *ri,
1732 uint16_t *ret_majority,
1733 enum ReferendumVote *ret_vote)
1734{
1735 uint16_t votes_yes = 0;
1736 uint16_t num_commited = 0;
1737
1738 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739 "Computing rfn majority for element %s of rfn {%s}\n",
1740 debug_str_element (ri->element),
1741 debug_str_rfn_key (&rfn->key));
1742
1743 for (uint16_t i = 0; i < rfn->num_peers; i++)
1744 {
1745 if (GNUNET_NO == rfn->peer_commited[i])
1746 continue;
1747 num_commited++;
1748
1749 if (GNUNET_YES == ri->votes[i])
1750 votes_yes++;
1751 }
1752
1753 if (votes_yes > (num_commited) / 2)
1754 {
1755 *ret_vote = ri->proposal;
1756 *ret_majority = votes_yes;
1757 }
1758 else
1759 {
1760 *ret_vote = VOTE_STAY;
1761 *ret_majority = num_commited - votes_yes;
1762 }
1763}
1764
1765
1766struct SetCopyCls
1767{
1768 struct TaskEntry *task;
1769 struct SetKey dst_set_key;
1770};
1771
1772
1773static void
1774set_copy_cb (void *cls,
1775 struct GNUNET_SET_Handle *copy)
1776{
1777 struct SetCopyCls *scc = cls;
1778 struct TaskEntry *task = scc->task;
1779 struct SetKey dst_set_key = scc->dst_set_key;
1780 struct SetEntry *set;
1781 struct SetHandle *sh = GNUNET_new (struct SetHandle);
1782
1783 sh->h = copy;
1784 GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1785 task->step->session->set_handles_tail,
1786 sh);
1787
1788 GNUNET_free (scc);
1789 set = GNUNET_new (struct SetEntry);
1790 set->h = copy;
1791 set->key = dst_set_key;
1792 put_set (task->step->session, set);
1793
1794 task->start (task);
1795}
1796
1797
1798/**
1799 * Call the start function of the given
1800 * task again after we created a copy of the given set.
1801 */
1802static void
1803create_set_copy_for_task (struct TaskEntry *task,
1804 struct SetKey *src_set_key,
1805 struct SetKey *dst_set_key)
1806{
1807 struct SetEntry *src_set;
1808 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1809
1810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1811 "Copying set {%s} to {%s} for task {%s}\n",
1812 debug_str_set_key (src_set_key),
1813 debug_str_set_key (dst_set_key),
1814 debug_str_task_key (&task->key));
1815
1816 scc->task = task;
1817 scc->dst_set_key = *dst_set_key;
1818 src_set = lookup_set (task->step->session, src_set_key);
1819 GNUNET_assert (NULL != src_set);
1820 GNUNET_SET_copy_lazy (src_set->h,
1821 set_copy_cb,
1822 scc);
1823}
1824
1825
1826struct SetMutationProgressCls
1827{
1828 int num_pending;
1829 /**
1830 * Task to finish once all changes are through.
1831 */
1832 struct TaskEntry *task;
1833};
1834
1835
1836static void
1837set_mutation_done (void *cls)
1838{
1839 struct SetMutationProgressCls *pc = cls;
1840
1841 GNUNET_assert (pc->num_pending > 0);
1842
1843 pc->num_pending--;
1844
1845 if (0 == pc->num_pending)
1846 {
1847 struct TaskEntry *task = pc->task;
1848 GNUNET_free (pc);
1849 finish_task (task);
1850 }
1851}
1852
1853
1854static void
1855try_finish_step_early (struct Step *step)
1856{
1857 if (GNUNET_YES == step->is_running)
1858 return;
1859 if (GNUNET_YES == step->is_finished)
1860 return;
1861 if (GNUNET_NO == step->early_finishable)
1862 return;
1863
1864 step->is_finished = GNUNET_YES;
1865
1866#ifdef GNUNET_EXTRA_LOGGING
1867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1868 "Finishing step `%s' early.\n",
1869 step->debug_name);
1870#endif
1871
1872 for (unsigned int i = 0; i < step->subordinates_len; i++)
1873 {
1874 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1875 step->subordinates[i]->pending_prereq--;
1876#ifdef GNUNET_EXTRA_LOGGING
1877 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1878 "Decreased pending_prereq to %u for step `%s'.\n",
1879 (unsigned int) step->subordinates[i]->pending_prereq,
1880 step->subordinates[i]->debug_name);
1881#endif
1882 try_finish_step_early (step->subordinates[i]);
1883 }
1884
1885 // XXX: maybe schedule as task to avoid recursion?
1886 run_ready_steps (step->session);
1887}
1888
1889
1890static void
1891finish_step (struct Step *step)
1892{
1893 GNUNET_assert (step->finished_tasks == step->tasks_len);
1894 GNUNET_assert (GNUNET_YES == step->is_running);
1895 GNUNET_assert (GNUNET_NO == step->is_finished);
1896
1897#ifdef GNUNET_EXTRA_LOGGING
1898 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1899 "All tasks of step `%s' with %u subordinates finished.\n",
1900 step->debug_name,
1901 step->subordinates_len);
1902#endif
1903
1904 for (unsigned int i = 0; i < step->subordinates_len; i++)
1905 {
1906 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1907 step->subordinates[i]->pending_prereq--;
1908#ifdef GNUNET_EXTRA_LOGGING
1909 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1910 "Decreased pending_prereq to %u for step `%s'.\n",
1911 (unsigned int) step->subordinates[i]->pending_prereq,
1912 step->subordinates[i]->debug_name);
1913#endif
1914 }
1915
1916 step->is_finished = GNUNET_YES;
1917
1918 // XXX: maybe schedule as task to avoid recursion?
1919 run_ready_steps (step->session);
1920}
1921
1922
1923/**
1924 * Apply the result from one round of gradecasts (i.e. every peer
1925 * should have gradecasted) to the peer's current set.
1926 *
1927 * @param task the task with context information
1928 */
1929static void
1930task_start_apply_round (struct TaskEntry *task)
1931{
1932 struct ConsensusSession *session = task->step->session;
1933 struct SetKey sk_in;
1934 struct SetKey sk_out;
1935 struct RfnKey rk_in;
1936 struct SetEntry *set_out;
1937 struct ReferendumEntry *rfn_in;
1938 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1939 struct RfnElementInfo *ri;
1940 struct SetMutationProgressCls *progress_cls;
1941 uint16_t worst_majority = UINT16_MAX;
1942
1943 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1944 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1945 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1946
1947 set_out = lookup_set (session, &sk_out);
1948 if (NULL == set_out)
1949 {
1950 create_set_copy_for_task (task,
1951 &sk_in,
1952 &sk_out);
1953 return;
1954 }
1955
1956 rfn_in = lookup_rfn (session, &rk_in);
1957 GNUNET_assert (NULL != rfn_in);
1958
1959 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1960 progress_cls->task = task;
1961
1962 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1963
1964 while (GNUNET_YES ==
1965 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1966 NULL,
1967 (const void **) &ri))
1968 {
1969 uint16_t majority_num;
1970 enum ReferendumVote majority_vote;
1971
1972 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1973
1974 if (worst_majority > majority_num)
1975 worst_majority = majority_num;
1976
1977 switch (majority_vote)
1978 {
1979 case VOTE_ADD:
1980 progress_cls->num_pending++;
1981 GNUNET_assert (GNUNET_OK ==
1982 GNUNET_SET_add_element (set_out->h,
1983 ri->element,
1984 &set_mutation_done,
1985 progress_cls));
1986 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987 "P%u: apply round: adding element %s with %u-majority.\n",
1988 session->local_peer_idx,
1989 debug_str_element (ri->element), majority_num);
1990 break;
1991
1992 case VOTE_REMOVE:
1993 progress_cls->num_pending++;
1994 GNUNET_assert (GNUNET_OK ==
1995 GNUNET_SET_remove_element (set_out->h,
1996 ri->element,
1997 &set_mutation_done,
1998 progress_cls));
1999 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2000 "P%u: apply round: deleting element %s with %u-majority.\n",
2001 session->local_peer_idx,
2002 debug_str_element (ri->element), majority_num);
2003 break;
2004
2005 case VOTE_STAY:
2006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2007 "P%u: apply round: keeping element %s with %u-majority.\n",
2008 session->local_peer_idx,
2009 debug_str_element (ri->element), majority_num);
2010 // do nothing
2011 break;
2012
2013 default:
2014 GNUNET_assert (0);
2015 break;
2016 }
2017 }
2018
2019 if (0 == progress_cls->num_pending)
2020 {
2021 // call closure right now, no pending ops
2022 GNUNET_free (progress_cls);
2023 finish_task (task);
2024 }
2025
2026 {
2027 uint16_t thresh = (session->num_peers / 3) * 2;
2028
2029 if (worst_majority >= thresh)
2030 {
2031 switch (session->early_stopping)
2032 {
2033 case EARLY_STOPPING_NONE:
2034 session->early_stopping = EARLY_STOPPING_ONE_MORE;
2035 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2036 "P%u: Stopping early (after one more superround)\n",
2037 session->local_peer_idx);
2038 break;
2039
2040 case EARLY_STOPPING_ONE_MORE:
2041 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2042 "P%u: finishing steps due to early finish\n",
2043 session->local_peer_idx);
2044 session->early_stopping = EARLY_STOPPING_DONE;
2045 {
2046 struct Step *step;
2047 for (step = session->steps_head; NULL != step; step = step->next)
2048 try_finish_step_early (step);
2049 }
2050 break;
2051
2052 case EARLY_STOPPING_DONE:
2053 /* We shouldn't be here anymore after early stopping */
2054 GNUNET_break (0);
2055 break;
2056
2057 default:
2058 GNUNET_assert (0);
2059 break;
2060 }
2061 }
2062 else if (EARLY_STOPPING_NONE != session->early_stopping)
2063 {
2064 // Our assumption about the number of bad peers
2065 // has been broken.
2066 GNUNET_break_op (0);
2067 }
2068 else
2069 {
2070 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2071 "P%u: NOT finishing early (majority not good enough)\n",
2072 session->local_peer_idx);
2073 }
2074 }
2075 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2076}
2077
2078
2079static void
2080task_start_grade (struct TaskEntry *task)
2081{
2082 struct ConsensusSession *session = task->step->session;
2083 struct ReferendumEntry *output_rfn;
2084 struct ReferendumEntry *input_rfn;
2085 struct DiffEntry *input_diff;
2086 struct RfnKey rfn_key;
2087 struct DiffKey diff_key;
2088 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2089 struct RfnElementInfo *ri;
2090 unsigned int gradecast_confidence = 2;
2091
2092 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
2093 output_rfn = lookup_rfn (session, &rfn_key);
2094 if (NULL == output_rfn)
2095 {
2096 output_rfn = rfn_create (session->num_peers);
2097 output_rfn->key = rfn_key;
2098 put_rfn (session, output_rfn);
2099 }
2100
2101 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition,
2102 task->key.leader };
2103 input_diff = lookup_diff (session, &diff_key);
2104 GNUNET_assert (NULL != input_diff);
2105
2106 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition,
2107 task->key.leader };
2108 input_rfn = lookup_rfn (session, &rfn_key);
2109 GNUNET_assert (NULL != input_rfn);
2110
2111 iter = GNUNET_CONTAINER_multihashmap_iterator_create (
2112 input_rfn->rfn_elements);
2113
2114 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader,
2115 session->num_peers);
2116
2117 while (GNUNET_YES ==
2118 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2119 NULL,
2120 (const void **) &ri))
2121 {
2122 uint16_t majority_num;
2123 enum ReferendumVote majority_vote;
2124
2125 // XXX: we need contested votes and non-contested votes here
2126 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2127
2128 if (majority_num <= session->num_peers / 3)
2129 majority_vote = VOTE_REMOVE;
2130
2131 switch (majority_vote)
2132 {
2133 case VOTE_STAY:
2134 break;
2135
2136 case VOTE_ADD:
2137 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2138 break;
2139
2140 case VOTE_REMOVE:
2141 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2142 break;
2143
2144 default:
2145 GNUNET_assert (0);
2146 break;
2147 }
2148 }
2149 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2150
2151 {
2152 uint16_t noncontested;
2153 noncontested = rfn_noncontested (input_rfn);
2154 if (noncontested < (session->num_peers / 3) * 2)
2155 {
2156 gradecast_confidence = GNUNET_MIN (1, gradecast_confidence);
2157 }
2158 if (noncontested < (session->num_peers / 3) + 1)
2159 {
2160 gradecast_confidence = 0;
2161 }
2162 }
2163
2164 if (gradecast_confidence >= 1)
2165 rfn_commit (output_rfn, task->key.leader);
2166
2167 if (gradecast_confidence <= 1)
2168 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2169
2170 finish_task (task);
2171}
2172
2173
2174static void
2175task_start_reconcile (struct TaskEntry *task)
2176{
2177 struct SetEntry *input;
2178 struct SetOpCls *setop = &task->cls.setop;
2179 struct ConsensusSession *session = task->step->session;
2180
2181 input = lookup_set (session, &setop->input_set);
2182 GNUNET_assert (NULL != input);
2183 GNUNET_assert (NULL != input->h);
2184
2185 /* We create the outputs for the operation here
2186 (rather than in the set operation callback)
2187 because we want something valid in there, even
2188 if the other peer doesn't talk to us */
2189
2190 if (SET_KIND_NONE != setop->output_set.set_kind)
2191 {
2192 /* If we don't have an existing output set,
2193 we clone the input set. */
2194 if (NULL == lookup_set (session, &setop->output_set))
2195 {
2196 create_set_copy_for_task (task,
2197 &setop->input_set,
2198 &setop->output_set);
2199 return;
2200 }
2201 }
2202
2203 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2204 {
2205 if (NULL == lookup_rfn (session, &setop->output_rfn))
2206 {
2207 struct ReferendumEntry *rfn;
2208
2209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2210 "P%u: output rfn <%s> missing, creating.\n",
2211 session->local_peer_idx,
2212 debug_str_rfn_key (&setop->output_rfn));
2213
2214 rfn = rfn_create (session->num_peers);
2215 rfn->key = setop->output_rfn;
2216 put_rfn (session, rfn);
2217 }
2218 }
2219
2220 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2221 {
2222 if (NULL == lookup_diff (session, &setop->output_diff))
2223 {
2224 struct DiffEntry *diff;
2225
2226 diff = diff_create ();
2227 diff->key = setop->output_diff;
2228 put_diff (session, diff);
2229 }
2230 }
2231
2232 if ((task->key.peer1 == session->local_peer_idx) && (task->key.peer2 ==
2233 session->local_peer_idx))
2234 {
2235 /* XXX: mark the corresponding rfn as committed if necessary */
2236 finish_task (task);
2237 return;
2238 }
2239
2240 if (task->key.peer1 == session->local_peer_idx)
2241 {
2242 struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2243
2244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2245 "P%u: Looking up set {%s} to run remote union\n",
2246 session->local_peer_idx,
2247 debug_str_set_key (&setop->input_set));
2248 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2249 rcm.header.size = htons (sizeof(rcm));
2250 rcm.kind = htons (task->key.kind);
2251 rcm.peer1 = htons (task->key.peer1);
2252 rcm.peer2 = htons (task->key.peer2);
2253 rcm.leader = htons (task->key.leader);
2254 rcm.repetition = htons (task->key.repetition);
2255 rcm.is_contested = htons (0);
2256
2257 GNUNET_assert (NULL == setop->op);
2258 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2259 "P%u: initiating set op with P%u, our set is %s\n",
2260 session->local_peer_idx,
2261 task->key.peer2,
2262 debug_str_set_key (&setop->input_set));
2263
2264 struct GNUNET_SET_Option opts[] = {
2265 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2266 { GNUNET_SET_OPTION_END },
2267 };
2268
2269 // XXX: maybe this should be done while
2270 // setting up tasks alreays?
2271 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2272 &session->global_id,
2273 &rcm.header,
2274 GNUNET_SET_RESULT_SYMMETRIC,
2275 opts,
2276 set_result_cb,
2277 task);
2278
2279 commit_set (session, task);
2280 }
2281 else if (task->key.peer2 == session->local_peer_idx)
2282 {
2283 /* Wait for the other peer to contact us */
2284 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2285 session->local_peer_idx, task->key.peer1);
2286
2287 if (NULL != setop->op)
2288 {
2289 commit_set (session, task);
2290 }
2291 }
2292 else
2293 {
2294 /* We made an error while constructing the task graph. */
2295 GNUNET_assert (0);
2296 }
2297}
2298
2299
2300static void
2301task_start_eval_echo (struct TaskEntry *task)
2302{
2303 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2304 struct ReferendumEntry *input_rfn;
2305 struct RfnElementInfo *ri;
2306 struct SetEntry *output_set;
2307 struct SetMutationProgressCls *progress_cls;
2308 struct ConsensusSession *session = task->step->session;
2309 struct SetKey sk_in;
2310 struct SetKey sk_out;
2311 struct RfnKey rk_in;
2312
2313 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition,
2314 task->key.leader };
2315 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition,
2316 task->key.leader };
2317 output_set = lookup_set (session, &sk_out);
2318 if (NULL == output_set)
2319 {
2320 create_set_copy_for_task (task,
2321 &sk_in,
2322 &sk_out);
2323 return;
2324 }
2325
2326 {
2327 // FIXME: should be marked as a shallow copy, so
2328 // we can destroy everything correctly
2329 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2330
2331 last_set->h = output_set->h;
2332 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2333 put_set (session, last_set);
2334 }
2335
2336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2337 "Evaluating referendum in Task {%s}\n",
2338 debug_str_task_key (&task->key));
2339
2340 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2341 progress_cls->task = task;
2342 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition,
2343 task->key.leader };
2344 input_rfn = lookup_rfn (session, &rk_in);
2345 GNUNET_assert (NULL != input_rfn);
2346 iter = GNUNET_CONTAINER_multihashmap_iterator_create (
2347 input_rfn->rfn_elements);
2348 GNUNET_assert (NULL != iter);
2349
2350 while (GNUNET_YES ==
2351 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2352 NULL,
2353 (const void **) &ri))
2354 {
2355 enum ReferendumVote majority_vote;
2356 uint16_t majority_num;
2357
2358 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2359
2360 if (majority_num < session->num_peers / 3)
2361 {
2362 /* It is not the case that all nonfaulty peers
2363 echoed the same value. Since we're doing a set reconciliation, we
2364 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2365 reconciliation as contested. Other peers might not know that the
2366 leader is faulty, thus we still re-distribute in the confirmation
2367 round. *///
2368 output_set->is_contested = GNUNET_YES;
2369 }
2370
2371 switch (majority_vote)
2372 {
2373 case VOTE_ADD:
2374 progress_cls->num_pending++;
2375 GNUNET_assert (GNUNET_OK ==
2376 GNUNET_SET_add_element (output_set->h,
2377 ri->element,
2378 set_mutation_done,
2379 progress_cls));
2380 break;
2381
2382 case VOTE_REMOVE:
2383 progress_cls->num_pending++;
2384 GNUNET_assert (GNUNET_OK ==
2385 GNUNET_SET_remove_element (output_set->h,
2386 ri->element,
2387 set_mutation_done,
2388 progress_cls));
2389 break;
2390
2391 case VOTE_STAY:
2392 /* Nothing to do. */
2393 break;
2394
2395 default:
2396 /* not reached */
2397 GNUNET_assert (0);
2398 }
2399 }
2400
2401 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2402
2403 if (0 == progress_cls->num_pending)
2404 {
2405 // call closure right now, no pending ops
2406 GNUNET_free (progress_cls);
2407 finish_task (task);
2408 }
2409}
2410
2411
2412static void
2413task_start_finish (struct TaskEntry *task)
2414{
2415 struct SetEntry *final_set;
2416 struct ConsensusSession *session = task->step->session;
2417
2418 final_set = lookup_set (session,
2419 &task->cls.finish.input_set);
2420 GNUNET_assert (NULL != final_set);
2421 GNUNET_SET_iterate (final_set->h,
2422 &send_to_client_iter,
2423 task);
2424}
2425
2426
2427static void
2428start_task (struct ConsensusSession *session,
2429 struct TaskEntry *task)
2430{
2431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2432 "P%u: starting task {%s}\n",
2433 session->local_peer_idx,
2434 debug_str_task_key (&task->key));
2435 GNUNET_assert (GNUNET_NO == task->is_started);
2436 GNUNET_assert (GNUNET_NO == task->is_finished);
2437 GNUNET_assert (NULL != task->start);
2438 task->start (task);
2439 task->is_started = GNUNET_YES;
2440}
2441
2442
2443/*
2444 * Run all steps of the session that don't any
2445 * more dependencies.
2446 */
2447static void
2448run_ready_steps (struct ConsensusSession *session)
2449{
2450 struct Step *step;
2451
2452 step = session->steps_head;
2453
2454 while (NULL != step)
2455 {
2456 if ((GNUNET_NO == step->is_running) && (0 == step->pending_prereq) &&
2457 (GNUNET_NO == step->is_finished))
2458 {
2459 GNUNET_assert (0 == step->finished_tasks);
2460
2461#ifdef GNUNET_EXTRA_LOGGING
2462 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2463 "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2464 session->local_peer_idx,
2465 step->debug_name,
2466 step->round, step->tasks_len, step->subordinates_len);
2467#endif
2468
2469 step->is_running = GNUNET_YES;
2470 for (size_t i = 0; i < step->tasks_len; i++)
2471 start_task (session, step->tasks[i]);
2472
2473 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2474 if ((step->finished_tasks == step->tasks_len) && (GNUNET_NO ==
2475 step->is_finished))
2476 finish_step (step);
2477
2478 /* Running the next ready steps will be triggered by task completion */
2479 return;
2480 }
2481 step = step->next;
2482 }
2483
2484 return;
2485}
2486
2487
2488static void
2489finish_task (struct TaskEntry *task)
2490{
2491 GNUNET_assert (GNUNET_NO == task->is_finished);
2492 task->is_finished = GNUNET_YES;
2493 task->step->finished_tasks++;
2494 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2495 "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2496 task->step->session->local_peer_idx,
2497 debug_str_task_key (&task->key),
2498 (unsigned int) task->step->finished_tasks,
2499 (unsigned int) task->step->tasks_len);
2500
2501 if (task->step->finished_tasks == task->step->tasks_len)
2502 finish_step (task->step);
2503}
2504
2505
2506/**
2507 * Search peer in the list of peers in session.
2508 *
2509 * @param peer peer to find
2510 * @param session session with peer
2511 * @return index of peer, -1 if peer is not in session
2512 */
2513static int
2514get_peer_idx (const struct GNUNET_PeerIdentity *peer,
2515 const struct ConsensusSession *session)
2516{
2517 for (int i = 0; i < session->num_peers; i++)
2518 if (0 == GNUNET_memcmp (peer, &session->peers[i]))
2519 return i;
2520 return -1;
2521}
2522
2523
2524/**
2525 * Compute a global, (hopefully) unique consensus session id,
2526 * from the local id of the consensus session, and the identities of all participants.
2527 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2528 * exactly the same peers, the global id will be different.
2529 *
2530 * @param session session to generate the global id for
2531 * @param local_session_id local id of the consensus session
2532 */
2533static void
2534compute_global_id (struct ConsensusSession *session,
2535 const struct GNUNET_HashCode *local_session_id)
2536{
2537 const char *salt = "gnunet-service-consensus/session_id";
2538
2539 GNUNET_assert (GNUNET_YES ==
2540 GNUNET_CRYPTO_kdf (&session->global_id,
2541 sizeof(struct GNUNET_HashCode),
2542 salt,
2543 strlen (salt),
2544 session->peers,
2545 session->num_peers * sizeof(struct
2546 GNUNET_PeerIdentity),
2547 local_session_id,
2548 sizeof(struct GNUNET_HashCode),
2549 NULL));
2550}
2551
2552
2553/**
2554 * Compare two peer identities (for qsort()).
2555 *
2556 * @param h1 some peer identity
2557 * @param h2 some peer identity
2558 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2559 */
2560static int
2561peer_id_cmp (const void *h1,
2562 const void *h2)
2563{
2564 return memcmp (h1, h2, sizeof(struct GNUNET_PeerIdentity));
2565}
2566
2567
2568/**
2569 * Create the sorted list of peers for the session,
2570 * add the local peer if not in the join message.
2571 *
2572 * @param session session to initialize
2573 * @param join_msg join message with the list of peers participating at the end
2574 */
2575static void
2576initialize_session_peer_list (
2577 struct ConsensusSession *session,
2578 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2579{
2580 const struct GNUNET_PeerIdentity *msg_peers
2581 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2582 int local_peer_in_list;
2583
2584 session->num_peers = ntohl (join_msg->num_peers);
2585
2586 /* Peers in the join message, may or may not include the local peer,
2587 Add it if it is missing. */
2588 local_peer_in_list = GNUNET_NO;
2589 for (unsigned int i = 0; i < session->num_peers; i++)
2590 {
2591 if (0 == GNUNET_memcmp (&msg_peers[i],
2592 &my_peer))
2593 {
2594 local_peer_in_list = GNUNET_YES;
2595 break;
2596 }
2597 }
2598 if (GNUNET_NO == local_peer_in_list)
2599 session->num_peers++;
2600
2601 session->peers = GNUNET_new_array (session->num_peers,
2602 struct GNUNET_PeerIdentity);
2603 if (GNUNET_NO == local_peer_in_list)
2604 session->peers[session->num_peers - 1] = my_peer;
2605 GNUNET_memcpy (session->peers,
2606 msg_peers,
2607 ntohl (join_msg->num_peers)
2608 * sizeof(struct GNUNET_PeerIdentity));
2609 qsort (session->peers,
2610 session->num_peers,
2611 sizeof (struct GNUNET_PeerIdentity),
2612 &peer_id_cmp);
2613}
2614
2615
2616static struct TaskEntry *
2617lookup_task (const struct ConsensusSession *session,
2618 const struct TaskKey *key)
2619{
2620 struct GNUNET_HashCode hash;
2621
2622 GNUNET_CRYPTO_hash (key,
2623 sizeof(struct TaskKey),
2624 &hash);
2625 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2626 "Looking up task hash %s\n",
2627 GNUNET_h2s (&hash));
2628 return GNUNET_CONTAINER_multihashmap_get (session->taskmap,
2629 &hash);
2630}
2631
2632
2633/**
2634 * Called when another peer wants to do a set operation with the
2635 * local peer.
2636 *
2637 * @param cls closure
2638 * @param other_peer the other peer
2639 * @param context_msg message with application specific information from
2640 * the other peer
2641 * @param request request from the other peer, use GNUNET_SET_accept
2642 * to accept it, otherwise the request will be refused
2643 * Note that we don't use a return value here, as it is also
2644 * necessary to specify the set we want to do the operation with,
2645 * which sometimes can be derived from the context message.
2646 * Also necessary to specify the timeout.
2647 */
2648static void
2649set_listen_cb (void *cls,
2650 const struct GNUNET_PeerIdentity *other_peer,
2651 const struct GNUNET_MessageHeader *context_msg,
2652 struct GNUNET_SET_Request *request)
2653{
2654 struct ConsensusSession *session = cls;
2655 struct TaskKey tk;
2656 struct TaskEntry *task;
2657 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2658
2659 if (NULL == context_msg)
2660 {
2661 GNUNET_break_op (0);
2662 return;
2663 }
2664
2665 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (
2666 context_msg->type))
2667 {
2668 GNUNET_break_op (0);
2669 return;
2670 }
2671
2672 if (sizeof(struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (
2673 context_msg->size))
2674 {
2675 GNUNET_break_op (0);
2676 return;
2677 }
2678
2679 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2680
2681 tk = ((struct TaskKey) {
2682 .kind = ntohs (cm->kind),
2683 .peer1 = ntohs (cm->peer1),
2684 .peer2 = ntohs (cm->peer2),
2685 .repetition = ntohs (cm->repetition),
2686 .leader = ntohs (cm->leader),
2687 });
2688
2689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2690 session->local_peer_idx, debug_str_task_key (&tk));
2691
2692 task = lookup_task (session, &tk);
2693
2694 if (NULL == task)
2695 {
2696 GNUNET_break_op (0);
2697 return;
2698 }
2699
2700 if (GNUNET_YES == task->is_finished)
2701 {
2702 GNUNET_break_op (0);
2703 return;
2704 }
2705
2706 if (task->key.peer2 != session->local_peer_idx)
2707 {
2708 /* We're being asked, so we must be the 2nd peer. */
2709 GNUNET_break_op (0);
2710 return;
2711 }
2712
2713 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2714 (task->key.peer2 == session->local_peer_idx)));
2715
2716 struct GNUNET_SET_Option opts[] = {
2717 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2718 { GNUNET_SET_OPTION_END },
2719 };
2720
2721 task->cls.setop.op = GNUNET_SET_accept (request,
2722 GNUNET_SET_RESULT_SYMMETRIC,
2723 opts,
2724 &set_result_cb,
2725 task);
2726
2727 /* If the task hasn't been started yet,
2728 we wait for that until we commit. */
2729
2730 if (GNUNET_YES == task->is_started)
2731 {
2732 commit_set (session, task);
2733 }
2734}
2735
2736
2737static void
2738put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2739 struct TaskEntry *t)
2740{
2741 struct GNUNET_HashCode round_hash;
2742 struct Step *s;
2743
2744 GNUNET_assert (NULL != t->step);
2745 t = GNUNET_memdup (t, sizeof(struct TaskEntry));
2746 s = t->step;
2747 if (s->tasks_len == s->tasks_cap)
2748 {
2749 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2750 GNUNET_array_grow (s->tasks,
2751 s->tasks_cap,
2752 target_size);
2753 }
2754
2755#ifdef GNUNET_EXTRA_LOGGING
2756 GNUNET_assert (NULL != s->debug_name);
2757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2758 debug_str_task_key (&t->key),
2759 s->debug_name);
2760#endif
2761
2762 s->tasks[s->tasks_len] = t;
2763 s->tasks_len++;
2764
2765 GNUNET_CRYPTO_hash (&t->key,
2766 sizeof(struct TaskKey),
2767 &round_hash);
2768 GNUNET_assert (GNUNET_OK ==
2769 GNUNET_CONTAINER_multihashmap_put (taskmap,
2770 &round_hash,
2771 t,
2772 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2773}
2774
2775
2776static void
2777install_step_timeouts (struct ConsensusSession *session)
2778{
2779 /* Given the fully constructed task graph
2780 with rounds for tasks, we can give the tasks timeouts. */
2781
2782 // unsigned int max_round;
2783
2784 /* XXX: implement! */
2785}
2786
2787
2788/*
2789 * Arrange two peers in some canonical order.
2790 */
2791static void
2792arrange_peers (uint16_t *p1,
2793 uint16_t *p2,
2794 uint16_t n)
2795{
2796 uint16_t a;
2797 uint16_t b;
2798
2799 GNUNET_assert (*p1 < n);
2800 GNUNET_assert (*p2 < n);
2801
2802 if (*p1 < *p2)
2803 {
2804 a = *p1;
2805 b = *p2;
2806 }
2807 else
2808 {
2809 a = *p2;
2810 b = *p1;
2811 }
2812
2813 /* For uniformly random *p1, *p2,
2814 this condition is true with 50% chance */
2815 if (((b - a) + n) % n <= n / 2)
2816 {
2817 *p1 = a;
2818 *p2 = b;
2819 }
2820 else
2821 {
2822 *p1 = b;
2823 *p2 = a;
2824 }
2825}
2826
2827
2828/**
2829 * Record @a dep as a dependency of @a step.
2830 */
2831static void
2832step_depend_on (struct Step *step,
2833 struct Step *dep)
2834{
2835 /* We're not checking for cyclic dependencies,
2836 but this is a cheap sanity check. */
2837 GNUNET_assert (step != dep);
2838 GNUNET_assert (NULL != step);
2839 GNUNET_assert (NULL != dep);
2840 GNUNET_assert (dep->round <= step->round);
2841
2842#ifdef GNUNET_EXTRA_LOGGING
2843 /* Make sure we have complete debugging information.
2844 Also checks that we don't screw up too badly
2845 constructing the task graph. */
2846 GNUNET_assert (NULL != step->debug_name);
2847 GNUNET_assert (NULL != dep->debug_name);
2848 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2849 "Making step `%s' depend on `%s'\n",
2850 step->debug_name,
2851 dep->debug_name);
2852#endif
2853
2854 if (dep->subordinates_cap == dep->subordinates_len)
2855 {
2856 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2857 GNUNET_array_grow (dep->subordinates,
2858 dep->subordinates_cap,
2859 target_size);
2860 }
2861
2862 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2863
2864 dep->subordinates[dep->subordinates_len] = step;
2865 dep->subordinates_len++;
2866
2867 step->pending_prereq++;
2868}
2869
2870
2871static struct Step *
2872create_step (struct ConsensusSession *session,
2873 int round,
2874 int early_finishable)
2875{
2876 struct Step *step;
2877
2878 step = GNUNET_new (struct Step);
2879 step->session = session;
2880 step->round = round;
2881 step->early_finishable = early_finishable;
2882 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2883 session->steps_tail,
2884 step);
2885 return step;
2886}
2887
2888
2889/**
2890 * Construct the task graph for a single gradecast.
2891 */
2892static void
2893construct_task_graph_gradecast (struct ConsensusSession *session,
2894 uint16_t rep,
2895 uint16_t lead,
2896 struct Step *step_before,
2897 struct Step *step_after)
2898{
2899 uint16_t n = session->num_peers;
2900 uint16_t me = session->local_peer_idx;
2901 uint16_t p1;
2902 uint16_t p2;
2903 /* The task we're currently setting up. */
2904 struct TaskEntry task;
2905 struct Step *step;
2906 struct Step *prev_step;
2907 uint16_t round;
2908
2909 round = step_before->round + 1;
2910
2911 /* gcast step 1: leader disseminates */
2912 step = create_step (session,
2913 round,
2914 GNUNET_YES);
2915#ifdef GNUNET_EXTRA_LOGGING
2916 GNUNET_asprintf (&step->debug_name,
2917 "disseminate leader %u rep %u",
2918 lead,
2919 rep);
2920#endif
2921 step_depend_on (step,
2922 step_before);
2923
2924 if (lead == me)
2925 {
2926 for (unsigned int k = 0; k < n; k++)
2927 {
2928 if (k == me)
2929 continue;
2930 p1 = me;
2931 p2 = k;
2932 arrange_peers (&p1, &p2, n);
2933 task = ((struct TaskEntry) {
2934 .step = step,
2935 .start = task_start_reconcile,
2936 .cancel = task_cancel_reconcile,
2937 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
2938 me },
2939 });
2940 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2941 put_task (session->taskmap, &task);
2942 }
2943 /* We run this task to make sure that the leader
2944 has the stored the SET_KIND_LEADER set of himself,
2945 so it can participate in the rest of the gradecast
2946 without the code having to handle any special cases. */
2947 task = ((struct TaskEntry) {
2948 .step = step,
2949 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2950 .start = task_start_reconcile,
2951 .cancel = task_cancel_reconcile,
2952 });
2953 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2954 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2955 me };
2956 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL,
2957 rep, me };
2958 put_task (session->taskmap, &task);
2959 }
2960 else
2961 {
2962 p1 = me;
2963 p2 = lead;
2964 arrange_peers (&p1, &p2, n);
2965 task = ((struct TaskEntry) {
2966 .step = step,
2967 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
2968 lead },
2969 .start = task_start_reconcile,
2970 .cancel = task_cancel_reconcile,
2971 });
2972 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2973 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2974 lead };
2975 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL,
2976 rep, lead };
2977 put_task (session->taskmap, &task);
2978 }
2979
2980 /* gcast phase 2: echo */
2981 prev_step = step;
2982 round += 1;
2983 step = create_step (session,
2984 round,
2985 GNUNET_YES);
2986#ifdef GNUNET_EXTRA_LOGGING
2987 GNUNET_asprintf (&step->debug_name,
2988 "echo leader %u rep %u",
2989 lead,
2990 rep);
2991#endif
2992 step_depend_on (step,
2993 prev_step);
2994
2995 for (unsigned int k = 0; k < n; k++)
2996 {
2997 p1 = k;
2998 p2 = me;
2999 arrange_peers (&p1, &p2, n);
3000 task = ((struct TaskEntry) {
3001 .step = step,
3002 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
3003 .start = task_start_reconcile,
3004 .cancel = task_cancel_reconcile,
3005 });
3006 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
3007 lead };
3008 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
3009 put_task (session->taskmap, &task);
3010 }
3011
3012 prev_step = step;
3013 /* Same round, since step only has local tasks */
3014 step = create_step (session, round, GNUNET_YES);
3015#ifdef GNUNET_EXTRA_LOGGING
3016 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
3017#endif
3018 step_depend_on (step, prev_step);
3019
3020 arrange_peers (&p1, &p2, n);
3021 task = ((struct TaskEntry) {
3022 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep,
3023 lead },
3024 .step = step,
3025 .start = task_start_eval_echo
3026 });
3027 put_task (session->taskmap, &task);
3028
3029 prev_step = step;
3030 round += 1;
3031 step = create_step (session, round, GNUNET_YES);
3032#ifdef GNUNET_EXTRA_LOGGING
3033 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
3034#endif
3035 step_depend_on (step, prev_step);
3036
3037 /* gcast phase 3: confirmation and grading */
3038 for (unsigned int k = 0; k < n; k++)
3039 {
3040 p1 = k;
3041 p2 = me;
3042 arrange_peers (&p1, &p2, n);
3043 task = ((struct TaskEntry) {
3044 .step = step,
3045 .start = task_start_reconcile,
3046 .cancel = task_cancel_reconcile,
3047 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep,
3048 lead },
3049 });
3050 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep,
3051 lead };
3052 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
3053 /* If there was at least one element in the echo round that was
3054 contested (i.e. it had no n-t majority), then we let the other peers
3055 know, and other peers let us know. The contested flag for each peer is
3056 stored in the rfn. */
3057 task.cls.setop.transceive_contested = GNUNET_YES;
3058 put_task (session->taskmap, &task);
3059 }
3060
3061 prev_step = step;
3062 /* Same round, since step only has local tasks */
3063 step = create_step (session, round, GNUNET_YES);
3064#ifdef GNUNET_EXTRA_LOGGING
3065 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead,
3066 rep);
3067#endif
3068 step_depend_on (step, prev_step);
3069
3070 task = ((struct TaskEntry) {
3071 .step = step,
3072 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep,
3073 lead },
3074 .start = task_start_grade,
3075 });
3076 put_task (session->taskmap, &task);
3077
3078 step_depend_on (step_after, step);
3079}
3080
3081
3082static void
3083construct_task_graph (struct ConsensusSession *session)
3084{
3085 uint16_t n = session->num_peers;
3086 uint16_t t = n / 3;
3087 uint16_t me = session->local_peer_idx;
3088 /* The task we're currently setting up. */
3089 struct TaskEntry task;
3090 /* Current leader */
3091 unsigned int lead;
3092 struct Step *step;
3093 struct Step *prev_step;
3094 unsigned int round = 0;
3095
3096 // XXX: introduce first step,
3097 // where we wait for all insert acks
3098 // from the set service
3099
3100 /* faster but brittle all-to-all */
3101
3102 // XXX: Not implemented yet
3103
3104 /* all-to-all step */
3105
3106 step = create_step (session, round, GNUNET_NO);
3107
3108#ifdef GNUNET_EXTRA_LOGGING
3109 step->debug_name = GNUNET_strdup ("all to all");
3110#endif
3111
3112 for (unsigned int i = 0; i < n; i++)
3113 {
3114 uint16_t p1;
3115 uint16_t p2;
3116
3117 p1 = me;
3118 p2 = i;
3119 arrange_peers (&p1, &p2, n);
3120 task = ((struct TaskEntry) {
3121 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
3122 .step = step,
3123 .start = task_start_reconcile,
3124 .cancel = task_cancel_reconcile,
3125 });
3126 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3127 task.cls.setop.output_set = task.cls.setop.input_set;
3128 task.cls.setop.do_not_remove = GNUNET_YES;
3129 put_task (session->taskmap, &task);
3130 }
3131
3132 round += 1;
3133 prev_step = step;
3134 step = create_step (session, round, GNUNET_NO);;
3135#ifdef GNUNET_EXTRA_LOGGING
3136 step->debug_name = GNUNET_strdup ("all to all 2");
3137#endif
3138 step_depend_on (step, prev_step);
3139
3140
3141 for (unsigned int i = 0; i < n; i++)
3142 {
3143 uint16_t p1;
3144 uint16_t p2;
3145
3146 p1 = me;
3147 p2 = i;
3148 arrange_peers (&p1, &p2, n);
3149 task = ((struct TaskEntry) {
3150 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3151 .step = step,
3152 .start = task_start_reconcile,
3153 .cancel = task_cancel_reconcile,
3154 });
3155 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3156 task.cls.setop.output_set = task.cls.setop.input_set;
3157 task.cls.setop.do_not_remove = GNUNET_YES;
3158 put_task (session->taskmap, &task);
3159 }
3160
3161 round += 1;
3162
3163 prev_step = step;
3164 step = NULL;
3165
3166
3167 /* Byzantine union */
3168
3169 /* sequential repetitions of the gradecasts */
3170 for (unsigned int i = 0; i < t + 1; i++)
3171 {
3172 struct Step *step_rep_start;
3173 struct Step *step_rep_end;
3174
3175 /* Every repetition is in a separate round. */
3176 step_rep_start = create_step (session, round, GNUNET_YES);
3177#ifdef GNUNET_EXTRA_LOGGING
3178 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3179#endif
3180
3181 step_depend_on (step_rep_start, prev_step);
3182
3183 /* gradecast has three rounds */
3184 round += 3;
3185 step_rep_end = create_step (session, round, GNUNET_YES);
3186#ifdef GNUNET_EXTRA_LOGGING
3187 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3188#endif
3189
3190 /* parallel gradecasts */
3191 for (lead = 0; lead < n; lead++)
3192 construct_task_graph_gradecast (session, i, lead, step_rep_start,
3193 step_rep_end);
3194
3195 task = ((struct TaskEntry) {
3196 .step = step_rep_end,
3197 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1 },
3198 .start = task_start_apply_round,
3199 });
3200 put_task (session->taskmap, &task);
3201
3202 prev_step = step_rep_end;
3203 }
3204
3205 /* There is no next gradecast round, thus the final
3206 start step is the overall end step of the gradecasts */
3207 round += 1;
3208 step = create_step (session, round, GNUNET_NO);
3209#ifdef GNUNET_EXTRA_LOGGING
3210 GNUNET_asprintf (&step->debug_name, "finish");
3211#endif
3212 step_depend_on (step, prev_step);
3213
3214 task = ((struct TaskEntry) {
3215 .step = step,
3216 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3217 .start = task_start_finish,
3218 });
3219 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3220
3221 put_task (session->taskmap, &task);
3222}
3223
3224
3225/**
3226 * Check join message.
3227 *
3228 * @param cls session of client that sent the message
3229 * @param m message sent by the client
3230 * @return #GNUNET_OK if @a m is well-formed
3231 */
3232static int
3233check_client_join (void *cls,
3234 const struct GNUNET_CONSENSUS_JoinMessage *m)
3235{
3236 uint32_t listed_peers = ntohl (m->num_peers);
3237
3238 if ((ntohs (m->header.size) - sizeof(*m)) !=
3239 listed_peers * sizeof(struct GNUNET_PeerIdentity))
3240 {
3241 GNUNET_break (0);
3242 return GNUNET_SYSERR;
3243 }
3244 return GNUNET_OK;
3245}
3246
3247
3248/**
3249 * Called when a client wants to join a consensus session.
3250 *
3251 * @param cls session of client that sent the message
3252 * @param m message sent by the client
3253 */
3254static void
3255handle_client_join (void *cls,
3256 const struct GNUNET_CONSENSUS_JoinMessage *m)
3257{
3258 struct ConsensusSession *session = cls;
3259 struct ConsensusSession *other_session;
3260
3261 initialize_session_peer_list (session,
3262 m);
3263 compute_global_id (session,
3264 &m->session_id);
3265
3266 /* Check if some local client already owns the session.
3267 It is only legal to have a session with an existing global id
3268 if all other sessions with this global id are finished.*/
3269 for (other_session = sessions_head;
3270 NULL != other_session;
3271 other_session = other_session->next)
3272 {
3273 if ( (other_session != session) &&
3274 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3275 &other_session->global_id)) )
3276 break;
3277 }
3278
3279 session->conclude_deadline
3280 = GNUNET_TIME_absolute_ntoh (m->deadline);
3281 session->conclude_start
3282 = GNUNET_TIME_absolute_ntoh (m->start);
3283 session->local_peer_idx = get_peer_idx (&my_peer,
3284 session);
3285 GNUNET_assert (-1 != session->local_peer_idx);
3286
3287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3288 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3289 GNUNET_h2s (&m->session_id),
3290 session->num_peers,
3291 session->local_peer_idx,
3292 GNUNET_STRINGS_relative_time_to_string (
3293 GNUNET_TIME_absolute_get_difference (session->conclude_start,
3294 session->conclude_deadline),
3295 GNUNET_YES));
3296
3297 session->set_listener
3298 = GNUNET_SET_listen (cfg,
3299 GNUNET_SET_OPERATION_UNION,
3300 &session->global_id,
3301 &set_listen_cb,
3302 session);
3303
3304 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3305 GNUNET_NO);
3306 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3307 GNUNET_NO);
3308 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3309 GNUNET_NO);
3310 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3311 GNUNET_NO);
3312
3313 {
3314 struct SetEntry *client_set;
3315
3316 client_set = GNUNET_new (struct SetEntry);
3317 client_set->h = GNUNET_SET_create (cfg,
3318 GNUNET_SET_OPERATION_UNION);
3319 struct SetHandle *sh = GNUNET_new (struct SetHandle);
3320 sh->h = client_set->h;
3321 GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3322 session->set_handles_tail,
3323 sh);
3324 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3325 put_set (session,
3326 client_set);
3327 }
3328
3329 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3330 int);
3331
3332 /* Just construct the task graph,
3333 but don't run anything until the client calls conclude. */
3334 construct_task_graph (session);
3335 GNUNET_SERVICE_client_continue (session->client);
3336}
3337
3338
3339/**
3340 * Called when a client performs an insert operation.
3341 *
3342 * @param cls client handle
3343 * @param msg message sent by the client
3344 * @return #GNUNET_OK (always well-formed)
3345 */
3346static int
3347check_client_insert (void *cls,
3348 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3349{
3350 return GNUNET_OK;
3351}
3352
3353
3354/**
3355 * Called when a client performs an insert operation.
3356 *
3357 * @param cls client handle
3358 * @param msg message sent by the client
3359 */
3360static void
3361handle_client_insert (void *cls,
3362 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3363{
3364 struct ConsensusSession *session = cls;
3365 ssize_t element_size;
3366 struct GNUNET_SET_Handle *initial_set;
3367 struct ConsensusElement *ce;
3368
3369 if (GNUNET_YES == session->conclude_started)
3370 {
3371 GNUNET_break (0);
3372 GNUNET_SERVICE_client_drop (session->client);
3373 return;
3374 }
3375 element_size = ntohs (msg->header.size) - sizeof(*msg);
3376 ce = GNUNET_malloc (sizeof(struct ConsensusElement) + element_size);
3377 GNUNET_memcpy (&ce[1],
3378 &msg[1],
3379 element_size);
3380 ce->payload_type = msg->element_type;
3381
3382 {
3383 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3384 struct SetEntry *entry;
3385
3386 entry = lookup_set (session,
3387 &key);
3388 GNUNET_assert (NULL != entry);
3389 initial_set = entry->h;
3390 }
3391
3392 session->num_client_insert_pending++;
3393
3394 {
3395 struct GNUNET_SET_Element element = {
3396 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3397 .size = sizeof(struct ConsensusElement) + element_size,
3398 .data = ce,
3399 };
3400
3401 GNUNET_SET_add_element (initial_set,
3402 &element,
3403 NULL,
3404 NULL);
3405#ifdef GNUNET_EXTRA_LOGGING
3406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3407 "P%u: element %s added\n",
3408 session->local_peer_idx,
3409 debug_str_element (&element));
3410#endif
3411 }
3412 GNUNET_free (ce);
3413 GNUNET_SERVICE_client_continue (session->client);
3414}
3415
3416
3417/**
3418 * Called when a client performs the conclude operation.
3419 *
3420 * @param cls client handle
3421 * @param message message sent by the client
3422 */
3423static void
3424handle_client_conclude (void *cls,
3425 const struct GNUNET_MessageHeader *message)
3426{
3427 struct ConsensusSession *session = cls;
3428
3429 if (GNUNET_YES == session->conclude_started)
3430 {
3431 /* conclude started twice */
3432 GNUNET_break (0);
3433 GNUNET_SERVICE_client_drop (session->client);
3434 return;
3435 }
3436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3437 "conclude requested\n");
3438 session->conclude_started = GNUNET_YES;
3439 install_step_timeouts (session);
3440 run_ready_steps (session);
3441 GNUNET_SERVICE_client_continue (session->client);
3442}
3443
3444
3445/**
3446 * Called to clean up, after a shutdown has been requested.
3447 *
3448 * @param cls closure
3449 */
3450static void
3451shutdown_task (void *cls)
3452{
3453 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3454 "shutting down\n");
3455 GNUNET_STATISTICS_destroy (statistics,
3456 GNUNET_NO);
3457 statistics = NULL;
3458}
3459
3460
3461/**
3462 * Start processing consensus requests.
3463 *
3464 * @param cls closure
3465 * @param c configuration to use
3466 * @param service the initialized service
3467 */
3468static void
3469run (void *cls,
3470 const struct GNUNET_CONFIGURATION_Handle *c,
3471 struct GNUNET_SERVICE_Handle *service)
3472{
3473 cfg = c;
3474 if (GNUNET_OK !=
3475 GNUNET_CRYPTO_get_peer_identity (cfg,
3476 &my_peer))
3477 {
3478 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3479 "Could not retrieve host identity\n");
3480 GNUNET_SCHEDULER_shutdown ();
3481 return;
3482 }
3483 statistics = GNUNET_STATISTICS_create ("consensus",
3484 cfg);
3485 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3486 NULL);
3487}
3488
3489
3490/**
3491 * Callback called when a client connects to the service.
3492 *
3493 * @param cls closure for the service
3494 * @param c the new client that connected to the service
3495 * @param mq the message queue used to send messages to the client
3496 * @return @a c
3497 */
3498static void *
3499client_connect_cb (void *cls,
3500 struct GNUNET_SERVICE_Client *c,
3501 struct GNUNET_MQ_Handle *mq)
3502{
3503 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3504
3505 session->client = c;
3506 session->client_mq = mq;
3507 GNUNET_CONTAINER_DLL_insert (sessions_head,
3508 sessions_tail,
3509 session);
3510 return session;
3511}
3512
3513
3514/**
3515 * Callback called when a client disconnected from the service
3516 *
3517 * @param cls closure for the service
3518 * @param c the client that disconnected
3519 * @param internal_cls should be equal to @a c
3520 */
3521static void
3522client_disconnect_cb (void *cls,
3523 struct GNUNET_SERVICE_Client *c,
3524 void *internal_cls)
3525{
3526 struct ConsensusSession *session = internal_cls;
3527
3528 if (NULL != session->set_listener)
3529 {
3530 GNUNET_SET_listen_cancel (session->set_listener);
3531 session->set_listener = NULL;
3532 }
3533 GNUNET_CONTAINER_DLL_remove (sessions_head,
3534 sessions_tail,
3535 session);
3536 while (session->set_handles_head)
3537 {
3538 struct SetHandle *sh = session->set_handles_head;
3539
3540 session->set_handles_head = sh->next;
3541 GNUNET_SET_destroy (sh->h);
3542 GNUNET_free (sh);
3543 }
3544 GNUNET_free (session);
3545}
3546
3547
3548/**
3549 * Define "main" method using service macro.
3550 */
3551GNUNET_SERVICE_MAIN (
3552 "consensus",
3553 GNUNET_SERVICE_OPTION_NONE,
3554 &run,
3555 &client_connect_cb,
3556 &client_disconnect_cb,
3557 NULL,
3558 GNUNET_MQ_hd_fixed_size (client_conclude,
3559 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3560 struct GNUNET_MessageHeader,
3561 NULL),
3562 GNUNET_MQ_hd_var_size (client_insert,
3563 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3564 struct GNUNET_CONSENSUS_ElementMessage,
3565 NULL),
3566 GNUNET_MQ_hd_var_size (client_join,
3567 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3568 struct GNUNET_CONSENSUS_JoinMessage,
3569 NULL),
3570 GNUNET_MQ_handler_end ());
3571
3572/* end of gnunet-service-consensus.c */
diff --git a/src/consensus/plugin_block_consensus.c b/src/consensus/plugin_block_consensus.c
deleted file mode 100644
index cdac12ed5..000000000
--- a/src/consensus/plugin_block_consensus.c
+++ /dev/null
@@ -1,137 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file consensus/plugin_block_consensus.c
23 * @brief consensus block, either nested block or marker
24 * @author Christian Grothoff
25 */
26
27#include "platform.h"
28#include "consensus_protocol.h"
29#include "gnunet_block_plugin.h"
30#include "gnunet_block_group_lib.h"
31
32
33/**
34 * Function called to validate a reply or a request. For
35 * request evaluation, simply pass "NULL" for the reply_block.
36 *
37 * @param cls closure
38 * @param ctx context
39 * @param type block type
40 * @param group block group to use
41 * @param eo control flags
42 * @param query original query (hash)
43 * @param xquery extrended query data (can be NULL, depending on type)
44 * @param xquery_size number of bytes in xquery
45 * @param reply_block response to validate
46 * @param reply_block_size number of bytes in reply block
47 * @return characterization of result
48 */
49static enum GNUNET_BLOCK_EvaluationResult
50block_plugin_consensus_evaluate (void *cls,
51 struct GNUNET_BLOCK_Context *ctx,
52 enum GNUNET_BLOCK_Type type,
53 struct GNUNET_BLOCK_Group *group,
54 enum GNUNET_BLOCK_EvaluationOptions eo,
55 const struct GNUNET_HashCode *query,
56 const void *xquery,
57 size_t xquery_size,
58 const void *reply_block,
59 size_t reply_block_size)
60{
61 const struct ConsensusElement *ce = reply_block;
62
63 if (reply_block_size < sizeof(struct ConsensusElement))
64 return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
65 if ( (0 != ce->marker) ||
66 (0 == ce->payload_type) )
67 return GNUNET_BLOCK_EVALUATION_OK_MORE;
68
69 return GNUNET_BLOCK_evaluate (ctx,
70 type,
71 group,
72 eo,
73 query,
74 xquery,
75 xquery_size,
76 &ce[1],
77 reply_block_size
78 - sizeof(struct ConsensusElement));
79}
80
81
82/**
83 * Function called to obtain the key for a block.
84 *
85 * @param cls closure
86 * @param type block type
87 * @param block block to get the key for
88 * @param block_size number of bytes in block
89 * @param key set to the key (query) for the given block
90 * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported
91 * (or if extracting a key from a block of this type does not work)
92 */
93static int
94block_plugin_consensus_get_key (void *cls,
95 enum GNUNET_BLOCK_Type type,
96 const void *block,
97 size_t block_size,
98 struct GNUNET_HashCode *key)
99{
100 return GNUNET_SYSERR;
101}
102
103
104/**
105 * Entry point for the plugin.
106 */
107void *
108libgnunet_plugin_block_consensus_init (void *cls)
109{
110 static enum GNUNET_BLOCK_Type types[] = {
111 GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
112 GNUNET_BLOCK_TYPE_ANY /* end of list */
113 };
114 struct GNUNET_BLOCK_PluginFunctions *api;
115
116 api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions);
117 api->evaluate = &block_plugin_consensus_evaluate;
118 api->get_key = &block_plugin_consensus_get_key;
119 api->types = types;
120 return api;
121}
122
123
124/**
125 * Exit point from the plugin.
126 */
127void *
128libgnunet_plugin_block_consensus_done (void *cls)
129{
130 struct GNUNET_BLOCK_PluginFunctions *api = cls;
131
132 GNUNET_free (api);
133 return NULL;
134}
135
136
137/* end of plugin_block_consensus.c */
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
deleted file mode 100644
index 67b366405..000000000
--- a/src/consensus/test_consensus.conf
+++ /dev/null
@@ -1,87 +0,0 @@
1[PATHS]
2GNUNET_TEST_HOME = $GNUNET_TMP/test-consensus/
3
4[consensus]
5#OPTIONS = -L INFO
6BINARY = gnunet-service-evil-consensus
7
8#PREFIX = valgrind
9
10#EVIL_SPEC = 0;cram-all;noreplace;5
11#EVIL_SPEC = 0;cram;5/1;cram;5
12#EVIL_SPEC = 0;cram;5/1;cram;5/2;cram;5
13#EVIL_SPEC = 0;cram;5/1;cram;5/2;cram;5/3;cram;5
14
15
16[arm]
17RESOURCE_DIAGNOSTICS = resource.log.${PEERID:-master}
18
19[core]
20IMMEDIATE_START = YES
21
22[revocation]
23IMMEDIATE_START = NO
24
25[fs]
26IMMEDIATE_START = NO
27
28[gns]
29IMMEDIATE_START = NO
30
31[zonemaster]
32IMMEDIATE_START = NO
33
34[hostlist]
35IMMEDIATE_START = NO
36
37[cadet]
38#PREFIX = valgrind
39
40[transport]
41PLUGINS = unix
42OPTIONS = -LERROR
43
44[set]
45#OPTIONS = -L INFO
46#PREFIX = valgrind --leak-check=full
47#PREFIX = valgrind
48
49[testbed]
50OVERLAY_TOPOLOGY = CLIQUE
51MAX_PARALLEL_OPERATIONS = 1000
52MAX_PARALLEL_TOPOLOGY_CONFIG_OPERATIONS = 100
53OPERATION_TIMEOUT = 60 s
54MAX_OPEN_FDS = 4096
55
56[hostlist]
57START_ON_DEMAND = NO
58
59[fs]
60START_ON_DEMAND = NO
61
62[revocation]
63START_ON_DEMAND = NO
64
65[nat]
66# Use addresses from the local network interfaces (including loopback, but also others)
67USE_LOCALADDR = YES
68
69# Disable IPv6 support
70DISABLEV6 = NO
71
72# Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8)
73RETURN_LOCAL_ADDRESSES = YES
74
75[nse]
76WORKBITS=0
77
78[ats]
79WAN_QUOTA_IN = unlimited
80WAN_QUOTA_OUT = unlimited
81
82[core]
83USE_EPHEMERAL_KEYS = NO
84
85[rps]
86START_ON_DEMAND = NO
87IMMEDIATE_START = NO
diff --git a/src/consensus/test_consensus_api.c b/src/consensus/test_consensus_api.c
deleted file mode 100644
index 235a67484..000000000
--- a/src/consensus/test_consensus_api.c
+++ /dev/null
@@ -1,120 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file consensus/test_consensus_api.c
23 * @brief testcase for consensus_api.c
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_consensus_service.h"
28#include "gnunet_testing_lib.h"
29
30
31static struct GNUNET_CONSENSUS_Handle *consensus;
32
33static struct GNUNET_HashCode session_id;
34
35static unsigned int elements_received;
36
37
38static void
39conclude_done (void *cls)
40{
41 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude over\n");
42 if (2 != elements_received)
43 GNUNET_assert (0);
44 GNUNET_SCHEDULER_shutdown ();
45}
46
47
48static void
49on_new_element (void *cls,
50 const struct GNUNET_SET_Element *element)
51{
52 elements_received++;
53}
54
55
56static void
57insert_done (void *cls, int success)
58{
59 /* make sure cb is only called once */
60 static int called = GNUNET_NO;
61
62 GNUNET_assert (GNUNET_NO == called);
63 called = GNUNET_YES;
64 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "insert done\n");
65 GNUNET_CONSENSUS_conclude (consensus, &conclude_done, NULL);
66}
67
68
69/**
70 * Signature of the main function of a task.
71 *
72 * @param cls closure
73 */
74static void
75on_shutdown (void *cls)
76{
77 if (NULL != consensus)
78 {
79 GNUNET_CONSENSUS_destroy (consensus);
80 consensus = NULL;
81 }
82}
83
84
85static void
86run (void *cls,
87 const struct GNUNET_CONFIGURATION_Handle *cfg,
88 struct GNUNET_TESTING_Peer *peer)
89{
90 char *str = "foo";
91
92 struct GNUNET_SET_Element el1 = { 4, 0, "foo" };
93 struct GNUNET_SET_Element el2 = { 5, 0, "quux" };
94
95 GNUNET_log_setup ("test_consensus_api",
96 "INFO",
97 NULL);
98 GNUNET_SCHEDULER_add_shutdown (&on_shutdown, NULL);
99
100 GNUNET_CRYPTO_hash (str, strlen (str), &session_id);
101 consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id,
102 GNUNET_TIME_relative_to_absolute (
103 GNUNET_TIME_UNIT_SECONDS),
104 GNUNET_TIME_relative_to_absolute (
105 GNUNET_TIME_UNIT_MINUTES),
106 on_new_element, &consensus);
107 GNUNET_assert (consensus != NULL);
108
109 GNUNET_CONSENSUS_insert (consensus, &el1, NULL, &consensus);
110 GNUNET_CONSENSUS_insert (consensus, &el2, &insert_done, &consensus);
111}
112
113
114int
115main (int argc, char **argv)
116{
117 return GNUNET_TESTING_peer_run ("test_consensus_api",
118 "test_consensus.conf",
119 &run, NULL);
120}