diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-06-03 10:53:49 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-06-03 10:53:49 +0000 |
commit | 68403fa780bf94ace2ebc13c2c09463cbbc0b57c (patch) | |
tree | 3442e4f25de90eab67c4f9813cb6e433c50b7482 /src/consensus | |
parent | fae7f583f2e11cac15fefcbefef64287ab6915d3 (diff) | |
download | gnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.tar.gz gnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.zip |
- conclude for SET
- consensus with SET
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/Makefile.am | 4 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 6 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 11 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 6 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 407 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 8 |
6 files changed, 256 insertions, 186 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index a0edb1d65..914fbdef8 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am | |||
@@ -61,6 +61,8 @@ gnunet_service_consensus_LDADD = \ | |||
61 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 61 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
62 | $(top_builddir)/src/set/libgnunetset.la \ | 62 | $(top_builddir)/src/set/libgnunetset.la \ |
63 | $(GN_LIBINTL) | 63 | $(GN_LIBINTL) |
64 | gnunet_service_consensus_DEPENDENCIES = \ | ||
65 | $(top_builddir)/src/set/libgnunetset.la | ||
64 | 66 | ||
65 | gnunet_service_evil_consensus_SOURCES = \ | 67 | gnunet_service_evil_consensus_SOURCES = \ |
66 | gnunet-service-consensus.c | 68 | gnunet-service-consensus.c |
@@ -71,6 +73,8 @@ gnunet_service_evil_consensus_LDADD = \ | |||
71 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 73 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
72 | $(top_builddir)/src/set/libgnunetset.la \ | 74 | $(top_builddir)/src/set/libgnunetset.la \ |
73 | $(GN_LIBINTL) | 75 | $(GN_LIBINTL) |
76 | gnunet_service_evil_consensus_DEPENDENCIES = \ | ||
77 | $(top_builddir)/src/set/libgnunetset.la | ||
74 | gnunet_service_evil_consensus_CFLAGS = -DEVIL | 78 | gnunet_service_evil_consensus_CFLAGS = -DEVIL |
75 | 79 | ||
76 | libgnunetconsensus_la_SOURCES = \ | 80 | libgnunetconsensus_la_SOURCES = \ |
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 635a610ca..e3ddb4913 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -236,9 +236,9 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) | |||
236 | */ | 236 | */ |
237 | static void | 237 | static void |
238 | handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | 238 | handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, |
239 | struct GNUNET_CONSENSUS_ElementMessage *msg) | 239 | struct GNUNET_CONSENSUS_ElementMessage *msg) |
240 | { | 240 | { |
241 | struct GNUNET_CONSENSUS_Element element; | 241 | struct GNUNET_SET_Element element; |
242 | 242 | ||
243 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); | 243 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); |
244 | 244 | ||
@@ -424,7 +424,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
424 | */ | 424 | */ |
425 | void | 425 | void |
426 | GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | 426 | GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, |
427 | const struct GNUNET_CONSENSUS_Element *element, | 427 | const struct GNUNET_SET_Element *element, |
428 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 428 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
429 | void *idc_cls) | 429 | void *idc_cls) |
430 | { | 430 | { |
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index af4d74100..4e0673c7d 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h | |||
@@ -38,12 +38,15 @@ GNUNET_NETWORK_STRUCT_BEGIN | |||
38 | /** | 38 | /** |
39 | * Sent as context message for set reconciliation. | 39 | * Sent as context message for set reconciliation. |
40 | */ | 40 | */ |
41 | struct ConsensusRoundMessage | 41 | struct GNUNET_CONSENSUS_RoundContextMessage |
42 | { | 42 | { |
43 | /** | ||
44 | * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT | ||
45 | */ | ||
43 | struct GNUNET_MessageHeader header; | 46 | struct GNUNET_MessageHeader header; |
44 | uint8_t round; | 47 | uint32_t round; |
45 | uint8_t exp_round; | 48 | uint32_t exp_round; |
46 | uint8_t exp_subround; | 49 | uint32_t exp_subround; |
47 | }; | 50 | }; |
48 | 51 | ||
49 | GNUNET_NETWORK_STRUCT_END | 52 | GNUNET_NETWORK_STRUCT_END |
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index d8c1b14ee..60db8c61a 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -133,7 +133,7 @@ do_consensus () | |||
133 | { | 133 | { |
134 | int j; | 134 | int j; |
135 | struct GNUNET_HashCode *val; | 135 | struct GNUNET_HashCode *val; |
136 | struct GNUNET_CONSENSUS_Element *element; | 136 | struct GNUNET_SET_Element *element; |
137 | generate_indices(unique_indices); | 137 | generate_indices(unique_indices); |
138 | 138 | ||
139 | val = GNUNET_malloc (sizeof *val); | 139 | val = GNUNET_malloc (sizeof *val); |
@@ -151,6 +151,8 @@ do_consensus () | |||
151 | } | 151 | } |
152 | } | 152 | } |
153 | 153 | ||
154 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "all elements inserted, calling conclude\n"); | ||
155 | |||
154 | for (i = 0; i < num_peers; i++) | 156 | for (i = 0; i < num_peers; i++) |
155 | GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]); | 157 | GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]); |
156 | } | 158 | } |
@@ -194,7 +196,7 @@ connect_complete (void *cls, | |||
194 | 196 | ||
195 | static void | 197 | static void |
196 | new_element_cb (void *cls, | 198 | new_element_cb (void *cls, |
197 | const struct GNUNET_CONSENSUS_Element *element) | 199 | const struct GNUNET_SET_Element *element) |
198 | { | 200 | { |
199 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); | 201 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); |
200 | } | 202 | } |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 44edeb215..21123b24f 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet | 2 | This file is part of GNUnet |
3 | (C) 2012 Christian Grothoff (and other contributing authors) | 3 | (C) 2012, 2013 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -43,9 +43,9 @@ | |||
43 | 43 | ||
44 | 44 | ||
45 | /** | 45 | /** |
46 | * Number of exponential rounds, used in the inventory and completion round. | 46 | * Number of exponential rounds, used in the exp and completion round. |
47 | */ | 47 | */ |
48 | #define NUM_EXP_ROUNDS (4) | 48 | #define NUM_EXP_ROUNDS 4 |
49 | 49 | ||
50 | /* forward declarations */ | 50 | /* forward declarations */ |
51 | 51 | ||
@@ -155,17 +155,17 @@ struct ConsensusSession | |||
155 | * Permutation of peers for the current round, | 155 | * Permutation of peers for the current round, |
156 | * maps logical index (for current round) to physical index (location in info array) | 156 | * maps logical index (for current round) to physical index (location in info array) |
157 | */ | 157 | */ |
158 | int *shuffle; | 158 | uint32_t *shuffle; |
159 | 159 | ||
160 | /** | 160 | /** |
161 | * Current round of the exponential scheme. | 161 | * Current round of the exponential scheme. |
162 | */ | 162 | */ |
163 | int exp_round; | 163 | uint32_t exp_round; |
164 | 164 | ||
165 | /** | 165 | /** |
166 | * Current sub-round of the exponential scheme. | 166 | * Current sub-round of the exponential scheme. |
167 | */ | 167 | */ |
168 | int exp_subround; | 168 | uint32_t exp_subround; |
169 | 169 | ||
170 | /** | 170 | /** |
171 | * The partner for the current exp-round | 171 | * The partner for the current exp-round |
@@ -201,17 +201,6 @@ struct ConsensusPeerInformation | |||
201 | struct GNUNET_PeerIdentity peer_id; | 201 | struct GNUNET_PeerIdentity peer_id; |
202 | 202 | ||
203 | /** | 203 | /** |
204 | * Do we connect to the peer, or does the peer connect to us? | ||
205 | * Only valid for all-to-all phases | ||
206 | */ | ||
207 | int is_outgoing; | ||
208 | |||
209 | /** | ||
210 | * Did we receive/send a consensus hello? | ||
211 | */ | ||
212 | int hello; | ||
213 | |||
214 | /** | ||
215 | * Back-reference to the consensus session, | 204 | * Back-reference to the consensus session, |
216 | * to that ConsensusPeerInformation can be used as a closure | 205 | * to that ConsensusPeerInformation can be used as a closure |
217 | */ | 206 | */ |
@@ -223,22 +212,14 @@ struct ConsensusPeerInformation | |||
223 | int exp_subround_finished; | 212 | int exp_subround_finished; |
224 | 213 | ||
225 | /** | 214 | /** |
226 | * GNUNET_YES if we synced inventory with this peer; | 215 | * Set operation we are currently executing with this peer. |
227 | * GNUNET_NO otherwise. | ||
228 | */ | ||
229 | int inventory_synced; | ||
230 | |||
231 | /** | ||
232 | * Round this peer seems to be in, according to the last SE we got. | ||
233 | * Necessary to store this, as we sometimes need to respond to a request from an | ||
234 | * older round, while we are already in the next round. | ||
235 | */ | 216 | */ |
236 | enum ConsensusRound apparent_round; | 217 | struct GNUNET_SET_OperationHandle *set_op; |
237 | 218 | ||
238 | /** | 219 | /** |
239 | * Set operation we are currently executing with this peer. | 220 | * Has conclude been called on the set_op? |
240 | */ | 221 | */ |
241 | struct GNUNET_SET_OperationHandle *set_op; | 222 | int set_op_concluded; |
242 | }; | 223 | }; |
243 | 224 | ||
244 | 225 | ||
@@ -268,9 +249,8 @@ static struct GNUNET_SERVER_Handle *srv; | |||
268 | static struct GNUNET_PeerIdentity my_peer; | 249 | static struct GNUNET_PeerIdentity my_peer; |
269 | 250 | ||
270 | 251 | ||
271 | /* | ||
272 | static int | 252 | static int |
273 | exp_subround_finished (const struct ConsensusSession *session) | 253 | have_exp_subround_finished (const struct ConsensusSession *session) |
274 | { | 254 | { |
275 | int not_finished; | 255 | int not_finished; |
276 | not_finished = 0; | 256 | not_finished = 0; |
@@ -284,25 +264,9 @@ exp_subround_finished (const struct ConsensusSession *session) | |||
284 | return GNUNET_YES; | 264 | return GNUNET_YES; |
285 | return GNUNET_NO; | 265 | return GNUNET_NO; |
286 | } | 266 | } |
287 | */ | ||
288 | 267 | ||
289 | 268 | ||
290 | 269 | ||
291 | /* | ||
292 | static int | ||
293 | inventory_round_finished (struct ConsensusSession *session) | ||
294 | { | ||
295 | int i; | ||
296 | int finished; | ||
297 | finished = 0; | ||
298 | for (i = 0; i < session->num_peers; i++) | ||
299 | if (GNUNET_YES == session->info[i].inventory_synced) | ||
300 | finished++; | ||
301 | if (finished >= (session->num_peers / 2)) | ||
302 | return GNUNET_YES; | ||
303 | return GNUNET_NO; | ||
304 | } | ||
305 | */ | ||
306 | 270 | ||
307 | /** | 271 | /** |
308 | * Destroy a session, free all resources associated with it. | 272 | * Destroy a session, free all resources associated with it. |
@@ -341,39 +305,6 @@ destroy_session (struct ConsensusSession *session) | |||
341 | 305 | ||
342 | 306 | ||
343 | /** | 307 | /** |
344 | * Start the inventory round, contact all peers we are supposed to contact. | ||
345 | * | ||
346 | * @param session the current session | ||
347 | */ | ||
348 | static void | ||
349 | start_inventory (struct ConsensusSession *session) | ||
350 | { | ||
351 | int i; | ||
352 | int last; | ||
353 | |||
354 | for (i = 0; i < session->num_peers; i++) | ||
355 | session->info[i].is_outgoing = GNUNET_NO; | ||
356 | |||
357 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | ||
358 | i = (session->local_peer_idx + 1) % session->num_peers; | ||
359 | while (i != last) | ||
360 | { | ||
361 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); | ||
362 | session->info[i].is_outgoing = GNUNET_YES; | ||
363 | // embrace_peer (&session->info[i]); | ||
364 | i = (i + 1) % session->num_peers; | ||
365 | } | ||
366 | // tie-breaker for even number of peers | ||
367 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | ||
368 | { | ||
369 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); | ||
370 | session->info[last].is_outgoing = GNUNET_YES; | ||
371 | // embrace_peer (&session->info[last]); | ||
372 | } | ||
373 | } | ||
374 | |||
375 | |||
376 | /** | ||
377 | * Start the next round. | 308 | * Start the next round. |
378 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | 309 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). |
379 | * | 310 | * |
@@ -407,27 +338,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
407 | subround_over (session, NULL); | 338 | subround_over (session, NULL); |
408 | break; | 339 | break; |
409 | case CONSENSUS_ROUND_EXCHANGE: | 340 | case CONSENSUS_ROUND_EXCHANGE: |
410 | /* handle two peers specially */ | 341 | /* FIXME: send all elements to client */ |
411 | if (session->num_peers <= 2) | ||
412 | { | ||
413 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); | ||
414 | //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); | ||
415 | //send_client_conclude_done (session); | ||
416 | session->current_round = CONSENSUS_ROUND_FINISH; | ||
417 | return; | ||
418 | } | ||
419 | session->current_round = CONSENSUS_ROUND_INVENTORY; | ||
420 | start_inventory (session); | ||
421 | break; | ||
422 | case CONSENSUS_ROUND_INVENTORY: | ||
423 | session->current_round = CONSENSUS_ROUND_COMPLETION; | ||
424 | session->exp_round = 0; | ||
425 | subround_over (session, NULL); | ||
426 | break; | ||
427 | case CONSENSUS_ROUND_COMPLETION: | ||
428 | session->current_round = CONSENSUS_ROUND_FINISH; | 342 | session->current_round = CONSENSUS_ROUND_FINISH; |
429 | //send_client_conclude_done (session); | ||
430 | break; | ||
431 | default: | 343 | default: |
432 | GNUNET_assert (0); | 344 | GNUNET_assert (0); |
433 | } | 345 | } |
@@ -435,68 +347,91 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
435 | 347 | ||
436 | 348 | ||
437 | /** | 349 | /** |
438 | * Adapt the shuffle of the session for the current round. | 350 | * Create a new permutation for the session's peers in session->shuffle. |
351 | * Uses a Fisher-Yates shuffle with pseudo-randomness coming from | ||
352 | * both the global session id and the current round index. | ||
353 | * | ||
354 | * @param session the session to create the new permutation for | ||
439 | */ | 355 | */ |
440 | static void | 356 | static void |
441 | shuffle (struct ConsensusSession *session) | 357 | shuffle (struct ConsensusSession *session) |
442 | { | 358 | { |
443 | /* adapted from random_permute in util/crypto_random.c */ | 359 | uint32_t i; |
444 | /* FIXME | 360 | uint32_t randomness[session->num_peers-1]; |
445 | unsigned int *ret; | 361 | |
446 | unsigned int i; | 362 | if (NULL == session->shuffle) |
447 | unsigned int tmp; | 363 | session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle)); |
448 | uint32_t x; | 364 | |
365 | GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), &session->exp_round, sizeof (uint32_t), | ||
366 | &session->global_id, sizeof (struct GNUNET_HashCode)); | ||
367 | |||
368 | for (i = 0; i < session->num_peers; i++) | ||
369 | session->shuffle[i] = i; | ||
449 | 370 | ||
450 | GNUNET_assert (n > 0); | 371 | for (i = session->num_peers - 1; i > 0; i--) |
451 | ret = GNUNET_malloc (n * sizeof (unsigned int)); | ||
452 | for (i = 0; i < n; i++) | ||
453 | ret[i] = i; | ||
454 | for (i = n - 1; i > 0; i--) | ||
455 | { | 372 | { |
456 | x = GNUNET_CRYPTO_random_u32 (mode, i + 1); | 373 | uint32_t x; |
457 | tmp = ret[x]; | 374 | uint32_t tmp; |
458 | ret[x] = ret[i]; | 375 | x = randomness[i-1]; |
459 | ret[i] = tmp; | 376 | tmp = session->shuffle[x]; |
377 | session->shuffle[x] = session->shuffle[i]; | ||
378 | session->shuffle[i] = tmp; | ||
460 | } | 379 | } |
461 | */ | ||
462 | } | 380 | } |
463 | 381 | ||
464 | 382 | ||
465 | /** | 383 | /** |
466 | * Find and set the partner_incoming and partner_outgoing of our peer, | 384 | * Find and set the partner_incoming and partner_outgoing of our peer, |
467 | * one of them may not exist in most cases. | 385 | * one of them may not exist (and thus set to NULL) if the number of peers |
386 | * in the session is not a power of two. | ||
468 | * | 387 | * |
469 | * @param session the consensus session | 388 | * @param session the consensus session |
470 | */ | 389 | */ |
471 | static void | 390 | static void |
472 | find_partners (struct ConsensusSession *session) | 391 | find_partners (struct ConsensusSession *session) |
473 | { | 392 | { |
474 | int mark[session->num_peers]; | 393 | int arc; |
475 | int i; | 394 | int partner_idx; |
476 | 395 | int largest_arc; | |
477 | memset (mark, 0, session->num_peers * sizeof (int)); | 396 | int num_ghosts; |
478 | session->partner_incoming = session->partner_outgoing = NULL; | 397 | |
479 | for (i = 0; i < session->num_peers; i++) | 398 | /* distance to neighboring peer in current subround */ |
399 | arc = 1 << session->exp_subround; | ||
400 | partner_idx = (session->local_peer_idx + arc) % session->num_peers; | ||
401 | largest_arc = 1; | ||
402 | while (largest_arc < session->num_peers) | ||
403 | largest_arc <<= 1; | ||
404 | num_ghosts = largest_arc - session->num_peers; | ||
405 | |||
406 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts); | ||
407 | |||
408 | if (0 == (session->local_peer_idx & arc)) | ||
480 | { | 409 | { |
481 | int arc; | 410 | /* we are outgoing */ |
482 | if (0 != mark[i]) | 411 | session->partner_outgoing = &session->info[session->shuffle[partner_idx]]; |
483 | continue; | 412 | /* are we a 'ghost' of a peer that would exist if |
484 | arc = (i + (1 << session->exp_subround)) % session->num_peers; | 413 | * the number of peers was a power of two, and thus have to partner |
485 | mark[i] = mark[arc] = 1; | 414 | * with an additional peer? |
486 | GNUNET_assert (i != arc); | 415 | */ |
487 | if (i == session->local_peer_idx) | 416 | if (session->local_peer_idx < num_ghosts) |
488 | { | 417 | { |
489 | GNUNET_assert (NULL == session->partner_outgoing); | 418 | int ghost_partner_idx; |
490 | session->partner_outgoing = &session->info[session->shuffle[arc]]; | 419 | ghost_partner_idx = (session->local_peer_idx - arc) % session->num_peers; |
491 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | 420 | /* platform dependent; modulo sometimes returns negative values */ |
421 | if (ghost_partner_idx < 0) | ||
422 | ghost_partner_idx += arc; | ||
423 | session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]]; | ||
492 | } | 424 | } |
493 | if (arc == session->local_peer_idx) | 425 | else |
494 | { | 426 | { |
495 | GNUNET_assert (NULL == session->partner_incoming); | 427 | session->partner_incoming = NULL; |
496 | session->partner_incoming = &session->info[session->shuffle[i]]; | ||
497 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
498 | } | 428 | } |
499 | } | 429 | } |
430 | else | ||
431 | { | ||
432 | session->partner_outgoing = NULL; | ||
433 | session->partner_incoming = &session->info[session->shuffle[partner_idx]]; | ||
434 | } | ||
500 | } | 435 | } |
501 | 436 | ||
502 | 437 | ||
@@ -508,11 +443,42 @@ find_partners (struct ConsensusSession *session) | |||
508 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK | 443 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK |
509 | * @param status see enum GNUNET_SET_Status | 444 | * @param status see enum GNUNET_SET_Status |
510 | */ | 445 | */ |
511 | static void set_result_cb (void *cls, | 446 | static void |
512 | const struct GNUNET_SET_Element *element, | 447 | set_result_cb (void *cls, |
513 | enum GNUNET_SET_Status status) | 448 | const struct GNUNET_SET_Element *element, |
449 | enum GNUNET_SET_Status status) | ||
514 | { | 450 | { |
515 | /* FIXME */ | 451 | struct ConsensusPeerInformation *cpi = cls; |
452 | |||
453 | switch (status) | ||
454 | { | ||
455 | case GNUNET_SET_STATUS_OK: | ||
456 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n"); | ||
457 | break; | ||
458 | case GNUNET_SET_STATUS_FAILURE: | ||
459 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n"); | ||
460 | break; | ||
461 | case GNUNET_SET_STATUS_HALF_DONE: | ||
462 | case GNUNET_SET_STATUS_DONE: | ||
463 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n"); | ||
464 | cpi->exp_subround_finished = GNUNET_YES; | ||
465 | if (have_exp_subround_finished (cpi->session) == GNUNET_YES) | ||
466 | subround_over (cpi->session, NULL); | ||
467 | return; | ||
468 | default: | ||
469 | GNUNET_break (0); | ||
470 | return; | ||
471 | } | ||
472 | |||
473 | switch (cpi->session->current_round) | ||
474 | { | ||
475 | case CONSENSUS_ROUND_EXCHANGE: | ||
476 | GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL); | ||
477 | break; | ||
478 | default: | ||
479 | GNUNET_break (0); | ||
480 | return; | ||
481 | } | ||
516 | } | 482 | } |
517 | 483 | ||
518 | 484 | ||
@@ -540,14 +506,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
540 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 506 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
541 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 507 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
542 | } | 508 | } |
543 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ | ||
544 | if ( (session->exp_round == NUM_EXP_ROUNDS) || | ||
545 | ((session->num_peers == 2) && (session->exp_round == 1))) | ||
546 | { | ||
547 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); | ||
548 | round_over (session, NULL); | ||
549 | return; | ||
550 | } | ||
551 | if (session->exp_round == 0) | 509 | if (session->exp_round == 0) |
552 | { | 510 | { |
553 | /* initialize everything for the log-rounds */ | 511 | /* initialize everything for the log-rounds */ |
@@ -575,18 +533,27 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
575 | 533 | ||
576 | if (NULL != session->partner_outgoing) | 534 | if (NULL != session->partner_outgoing) |
577 | { | 535 | { |
536 | struct GNUNET_CONSENSUS_RoundContextMessage *msg; | ||
537 | msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage); | ||
538 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); | ||
539 | msg->header.size = htons (sizeof *msg); | ||
540 | msg->round = htonl (session->current_round); | ||
541 | msg->exp_round = htonl (session->exp_round); | ||
542 | msg->exp_subround = htonl (session->exp_subround); | ||
543 | |||
578 | if (NULL != session->partner_outgoing->set_op) | 544 | if (NULL != session->partner_outgoing->set_op) |
545 | { | ||
579 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); | 546 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); |
547 | } | ||
580 | session->partner_outgoing->set_op = | 548 | session->partner_outgoing->set_op = |
581 | GNUNET_SET_evaluate (session->element_set, | 549 | GNUNET_SET_evaluate (&session->partner_outgoing->peer_id, |
582 | &session->partner_outgoing->peer_id, | ||
583 | &session->global_id, | 550 | &session->global_id, |
584 | NULL, /* FIXME */ | 551 | (struct GNUNET_MessageHeader *) msg, |
585 | 0, /* FIXME */ | 552 | 0, /* FIXME */ |
586 | GNUNET_SET_RESULT_ADDED, | 553 | GNUNET_SET_RESULT_ADDED, |
587 | set_result_cb, session); | 554 | set_result_cb, session->partner_outgoing); |
588 | 555 | GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set); | |
589 | 556 | session->partner_outgoing->set_op_concluded = GNUNET_YES; | |
590 | } | 557 | } |
591 | 558 | ||
592 | #ifdef GNUNET_EXTRA_LOGGING | 559 | #ifdef GNUNET_EXTRA_LOGGING |
@@ -642,6 +609,8 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod | |||
642 | int i; | 609 | int i; |
643 | struct GNUNET_HashCode tmp; | 610 | struct GNUNET_HashCode tmp; |
644 | 611 | ||
612 | /* FIXME: use kdf? */ | ||
613 | |||
645 | session->global_id = *session_id; | 614 | session->global_id = *session_id; |
646 | for (i = 0; i < session->num_peers; ++i) | 615 | for (i = 0; i < session->num_peers; ++i) |
647 | { | 616 | { |
@@ -727,9 +696,6 @@ initialize_session_peer_list (struct ConsensusSession *session, | |||
727 | } | 696 | } |
728 | 697 | ||
729 | 698 | ||
730 | |||
731 | |||
732 | |||
733 | /** | 699 | /** |
734 | * Called when another peer wants to do a set operation with the | 700 | * Called when another peer wants to do a set operation with the |
735 | * local peer. | 701 | * local peer. |
@@ -750,7 +716,67 @@ set_listen_cb (void *cls, | |||
750 | const struct GNUNET_MessageHeader *context_msg, | 716 | const struct GNUNET_MessageHeader *context_msg, |
751 | struct GNUNET_SET_Request *request) | 717 | struct GNUNET_SET_Request *request) |
752 | { | 718 | { |
753 | /* FIXME */ | 719 | struct ConsensusSession *session = cls; |
720 | struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; | ||
721 | struct ConsensusPeerInformation *cpi; | ||
722 | int index; | ||
723 | |||
724 | if (NULL == context_msg) | ||
725 | { | ||
726 | GNUNET_break_op (0); | ||
727 | return; | ||
728 | } | ||
729 | |||
730 | index = get_peer_idx (other_peer, session); | ||
731 | |||
732 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s (&other_peer->hashPubKey)); | ||
733 | |||
734 | if (index < 0) | ||
735 | { | ||
736 | GNUNET_break_op (0); | ||
737 | return; | ||
738 | } | ||
739 | |||
740 | cpi = &session->info[index]; | ||
741 | |||
742 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", session->local_peer_idx, index); | ||
743 | |||
744 | switch (session->current_round) | ||
745 | { | ||
746 | case CONSENSUS_ROUND_EXCHANGE: | ||
747 | if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE) | ||
748 | { | ||
749 | GNUNET_break_op (0); | ||
750 | return; | ||
751 | } | ||
752 | if (ntohl (msg->exp_round) < session->exp_round) | ||
753 | { | ||
754 | GNUNET_break_op (0); | ||
755 | return; | ||
756 | } | ||
757 | if (ntohl (msg->exp_subround) < session->exp_subround) | ||
758 | { | ||
759 | GNUNET_break_op (0); | ||
760 | return; | ||
761 | } | ||
762 | if (NULL != cpi->set_op) | ||
763 | GNUNET_SET_operation_cancel (cpi->set_op); | ||
764 | cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, | ||
765 | set_result_cb, &session->info[index]); | ||
766 | if (ntohl (msg->exp_subround) == session->exp_subround) | ||
767 | { | ||
768 | cpi->set_op_concluded = GNUNET_YES; | ||
769 | GNUNET_SET_conclude (cpi->set_op, session->element_set); | ||
770 | } | ||
771 | else | ||
772 | { | ||
773 | cpi->set_op_concluded = GNUNET_NO; | ||
774 | } | ||
775 | break; | ||
776 | default: | ||
777 | GNUNET_break_op (0); | ||
778 | return; | ||
779 | } | ||
754 | } | 780 | } |
755 | 781 | ||
756 | 782 | ||
@@ -769,7 +795,9 @@ initialize_session (struct ConsensusSession *session, | |||
769 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); | 795 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); |
770 | compute_global_id (session, &join_msg->session_id); | 796 | compute_global_id (session, &join_msg->session_id); |
771 | 797 | ||
772 | /* check if some local client already owns the session. */ | 798 | /* check if some local client already owns the session. |
799 | * it is only legal to have a session with an existing global id | ||
800 | * if all other sessions with this global id are finished.*/ | ||
773 | other_session = sessions_head; | 801 | other_session = sessions_head; |
774 | while (NULL != other_session) | 802 | while (NULL != other_session) |
775 | { | 803 | { |
@@ -789,6 +817,8 @@ initialize_session (struct ConsensusSession *session, | |||
789 | 817 | ||
790 | session->local_peer_idx = get_peer_idx (&my_peer, session); | 818 | session->local_peer_idx = get_peer_idx (&my_peer, session); |
791 | GNUNET_assert (-1 != session->local_peer_idx); | 819 | GNUNET_assert (-1 != session->local_peer_idx); |
820 | session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
821 | GNUNET_assert (NULL != session->element_set); | ||
792 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, | 822 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, |
793 | &session->global_id, | 823 | &session->global_id, |
794 | set_listen_cb, session); | 824 | set_listen_cb, session); |
@@ -827,6 +857,8 @@ client_join (void *cls, | |||
827 | { | 857 | { |
828 | struct ConsensusSession *session; | 858 | struct ConsensusSession *session; |
829 | 859 | ||
860 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n"); | ||
861 | |||
830 | session = get_session_by_client (client); | 862 | session = get_session_by_client (client); |
831 | if (NULL != session) | 863 | if (NULL != session) |
832 | { | 864 | { |
@@ -835,9 +867,13 @@ client_join (void *cls, | |||
835 | return; | 867 | return; |
836 | } | 868 | } |
837 | session = GNUNET_new (struct ConsensusSession); | 869 | session = GNUNET_new (struct ConsensusSession); |
870 | session->client = client; | ||
838 | GNUNET_SERVER_client_keep (client); | 871 | GNUNET_SERVER_client_keep (client); |
839 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | 872 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); |
840 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); | 873 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); |
874 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
875 | |||
876 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n"); | ||
841 | } | 877 | } |
842 | 878 | ||
843 | 879 | ||
@@ -858,12 +894,7 @@ client_insert (void *cls, | |||
858 | struct GNUNET_SET_Element *element; | 894 | struct GNUNET_SET_Element *element; |
859 | ssize_t element_size; | 895 | ssize_t element_size; |
860 | 896 | ||
861 | session = sessions_head; | 897 | session = get_session_by_client (client); |
862 | while (NULL != session) | ||
863 | { | ||
864 | if (session->client == client) | ||
865 | break; | ||
866 | } | ||
867 | 898 | ||
868 | if (NULL == session) | 899 | if (NULL == session) |
869 | { | 900 | { |
@@ -886,6 +917,7 @@ client_insert (void *cls, | |||
886 | GNUNET_break (0); | 917 | GNUNET_break (0); |
887 | return; | 918 | return; |
888 | } | 919 | } |
920 | |||
889 | element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); | 921 | element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); |
890 | element->type = msg->element_type; | 922 | element->type = msg->element_type; |
891 | element->size = element_size; | 923 | element->size = element_size; |
@@ -893,6 +925,8 @@ client_insert (void *cls, | |||
893 | element->data = &element[1]; | 925 | element->data = &element[1]; |
894 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); | 926 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); |
895 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 927 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
928 | |||
929 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx); | ||
896 | } | 930 | } |
897 | 931 | ||
898 | 932 | ||
@@ -911,10 +945,10 @@ client_conclude (void *cls, | |||
911 | struct ConsensusSession *session; | 945 | struct ConsensusSession *session; |
912 | struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; | 946 | struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; |
913 | 947 | ||
914 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; | ||
915 | 948 | ||
949 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); | ||
950 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; | ||
916 | session = get_session_by_client (client); | 951 | session = get_session_by_client (client); |
917 | |||
918 | if (NULL == session) | 952 | if (NULL == session) |
919 | { | 953 | { |
920 | /* client not found */ | 954 | /* client not found */ |
@@ -922,14 +956,12 @@ client_conclude (void *cls, | |||
922 | GNUNET_SERVER_client_disconnect (client); | 956 | GNUNET_SERVER_client_disconnect (client); |
923 | return; | 957 | return; |
924 | } | 958 | } |
925 | |||
926 | if (CONSENSUS_ROUND_BEGIN != session->current_round) | 959 | if (CONSENSUS_ROUND_BEGIN != session->current_round) |
927 | { | 960 | { |
928 | /* client requested conclude twice */ | 961 | /* client requested conclude twice */ |
929 | GNUNET_break (0); | 962 | GNUNET_break (0); |
930 | return; | 963 | return; |
931 | } | 964 | } |
932 | |||
933 | if (session->num_peers <= 1) | 965 | if (session->num_peers <= 1) |
934 | { | 966 | { |
935 | //send_client_conclude_done (session); | 967 | //send_client_conclude_done (session); |
@@ -937,7 +969,7 @@ client_conclude (void *cls, | |||
937 | else | 969 | else |
938 | { | 970 | { |
939 | session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); | 971 | session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); |
940 | /* the 'begin' round is over, start with the next, real round */ | 972 | /* the 'begin' round is over, start with the next, actual round */ |
941 | round_over (session, NULL); | 973 | round_over (session, NULL); |
942 | } | 974 | } |
943 | 975 | ||
@@ -964,6 +996,29 @@ shutdown_task (void *cls, | |||
964 | 996 | ||
965 | 997 | ||
966 | /** | 998 | /** |
999 | * Clean up after a client after it is | ||
1000 | * disconnected (either by us or by itself) | ||
1001 | * | ||
1002 | * @param cls closure, unused | ||
1003 | * @param client the client to clean up after | ||
1004 | */ | ||
1005 | void | ||
1006 | handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | ||
1007 | { | ||
1008 | struct ConsensusSession *session; | ||
1009 | |||
1010 | session = get_session_by_client (client); | ||
1011 | if (NULL == session) | ||
1012 | return; | ||
1013 | if ((CONSENSUS_ROUND_BEGIN == session->current_round) || | ||
1014 | (CONSENSUS_ROUND_FINISH == session->current_round)) | ||
1015 | destroy_session (session); | ||
1016 | else | ||
1017 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n"); | ||
1018 | } | ||
1019 | |||
1020 | |||
1021 | /** | ||
967 | * Start processing consensus requests. | 1022 | * Start processing consensus requests. |
968 | * | 1023 | * |
969 | * @param cls closure | 1024 | * @param cls closure |
@@ -971,13 +1026,14 @@ shutdown_task (void *cls, | |||
971 | * @param c configuration to use | 1026 | * @param c configuration to use |
972 | */ | 1027 | */ |
973 | static void | 1028 | static void |
974 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) | 1029 | run (void *cls, struct GNUNET_SERVER_Handle *server, |
1030 | const struct GNUNET_CONFIGURATION_Handle *c) | ||
975 | { | 1031 | { |
976 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | 1032 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { |
977 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | ||
978 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | ||
979 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, | 1033 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, |
980 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, | 1034 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, |
1035 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | ||
1036 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | ||
981 | {NULL, NULL, 0, 0} | 1037 | {NULL, NULL, 0, 0} |
982 | }; | 1038 | }; |
983 | 1039 | ||
@@ -992,6 +1048,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
992 | } | 1048 | } |
993 | GNUNET_SERVER_add_handlers (server, server_handlers); | 1049 | GNUNET_SERVER_add_handlers (server, server_handlers); |
994 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | 1050 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); |
1051 | GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); | ||
995 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | 1052 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); |
996 | } | 1053 | } |
997 | 1054 | ||
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 437636c99..37facf84e 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf | |||
@@ -5,7 +5,7 @@ HOSTNAME = localhost | |||
5 | HOME = $SERVICEHOME | 5 | HOME = $SERVICEHOME |
6 | BINARY = gnunet-service-consensus | 6 | BINARY = gnunet-service-consensus |
7 | #PREFIX = gdbserver :12345 | 7 | #PREFIX = gdbserver :12345 |
8 | PREFIX = valgrind --leak-check=full | 8 | PREFIX = valgrind |
9 | ACCEPT_FROM = 127.0.0.1; | 9 | ACCEPT_FROM = 127.0.0.1; |
10 | ACCEPT_FROM6 = ::1; | 10 | ACCEPT_FROM6 = ::1; |
11 | UNIXPATH = /tmp/gnunet-service-consensus.sock | 11 | UNIXPATH = /tmp/gnunet-service-consensus.sock |
@@ -19,7 +19,11 @@ OPTIONS = -LERROR | |||
19 | 19 | ||
20 | 20 | ||
21 | [arm] | 21 | [arm] |
22 | DEFAULTSERVICES = core consensus | 22 | DEFAULTSERVICES = core consensus set |
23 | |||
24 | [set] | ||
25 | OPTIONS = -L INFO | ||
26 | PREFIX = valgrind | ||
23 | 27 | ||
24 | 28 | ||
25 | [testbed] | 29 | [testbed] |