aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2015-09-22 01:20:28 +0000
committerFlorian Dold <florian.dold@gmail.com>2015-09-22 01:20:28 +0000
commitafc7d17dc798216e89506fb4e66aabb7e0f0077b (patch)
tree786d44d7852795f9f20549466f39683e32e6abea /src/consensus/gnunet-service-consensus.c
parenta8bb50dcefdab19f4990995ded586b237501b2c1 (diff)
downloadgnunet-afc7d17dc798216e89506fb4e66aabb7e0f0077b.tar.gz
gnunet-afc7d17dc798216e89506fb4e66aabb7e0f0077b.zip
work on CONSENSUS and SET
- byzantine consensus work in progress - fix SET generation handling
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c2465
1 files changed, 1799 insertions, 666 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index acae74eaf..424be7a71 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -34,234 +34,410 @@
34#include "consensus.h" 34#include "consensus.h"
35 35
36 36
37/**
38 * Log macro that prefixes the local peer and the peer we are in contact with.
39 *
40 * @param kind log level
41 * @param cpi ConsensusPeerInformation of the partner peer
42 * @param m log message
43 */
44#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
45 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
46 37
38GNUNET_NETWORK_STRUCT_BEGIN
47 39
48/** 40/**
49 * Number of exponential rounds, used in the exp and completion round. 41 * Tuple of integers that together
42 * identify a task uniquely.
50 */ 43 */
51#define NUM_EXP_REPETITIONS 4 44struct TaskKey {
52 45 /**
53 46 * A value from 'enum PhaseKind'.
54/* forward declarations */ 47 */
55 48 uint16_t kind GNUNET_PACKED;
56/* mutual recursion with struct ConsensusSession */
57struct ConsensusPeerInformation;
58
59/* mutual recursion with round_over */
60static void
61subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
62 49
50 /**
51 * Number of the first peer
52 * in canonical order.
53 */
54 int16_t peer1 GNUNET_PACKED;
63 55
64/**
65 * Describes the current round a consensus session is in.
66 */
67enum ConsensusRound
68{
69 /** 56 /**
70 * Not started the protocol yet. 57 * Number of the second peer in canonical order.
71 */ 58 */
72 CONSENSUS_ROUND_BEGIN=0, 59 int16_t peer2 GNUNET_PACKED;
60
73 /** 61 /**
74 * Distribution of elements with the exponential scheme. 62 * Repetition of the gradecast phase.
75 */ 63 */
76 CONSENSUS_ROUND_EXCHANGE, 64 int16_t repetition GNUNET_PACKED;
65
77 /** 66 /**
78 * Collect and distribute missing values. 67 * Leader in the gradecast phase.
68 *
69 * Can be different from both peer1 and peer2.
79 */ 70 */
80 CONSENSUS_ROUND_COMPLETION, 71 int16_t leader GNUNET_PACKED;
72};
73
74
75enum ReferendumVote
76{
77 VOTE_NONE = 0,
78 VOTE_ADD = 1,
79 VOTE_REMOVE = 2,
80 VOTE_CONTESTED = 3
81};
82
83
84struct SetKey
85{
86 int set_kind GNUNET_PACKED;
87 int k1 GNUNET_PACKED;
88 int k2 GNUNET_PACKED;
89};
90
91
92struct SetEntry
93{
94 struct SetKey key;
95 struct GNUNET_SET_Handle *h;
81 /** 96 /**
82 * Consensus concluded. After timeout and finished communication with client, 97 * GNUNET_YES if the set resulted
83 * consensus session will be destroyed. 98 * from applying a referendum with contested
99 * elements.
84 */ 100 */
85 CONSENSUS_ROUND_FINISH 101 int is_contested;
86}; 102};
87 103
88 104
89/** 105struct DiffKey
90 * Information about the current round. 106{
91 */ 107 int diff_kind GNUNET_PACKED;
92struct RoundInfo 108 int k1 GNUNET_PACKED;
109 int k2 GNUNET_PACKED;
110};
111
112struct RfnKey
113{
114 int rfn_kind GNUNET_PACKED;
115 int k1 GNUNET_PACKED;
116 int k2 GNUNET_PACKED;
117};
118
119
120GNUNET_NETWORK_STRUCT_END
121
122enum PhaseKind
123{
124 PHASE_KIND_ALL_TO_ALL,
125 PHASE_KIND_GRADECAST_LEADER,
126 PHASE_KIND_GRADECAST_ECHO,
127 PHASE_KIND_GRADECAST_ECHO_GRADE,
128 PHASE_KIND_GRADECAST_CONFIRM,
129 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
130 PHASE_KIND_GRADECAST_APPLY_RESULT,
131 PHASE_KIND_FINISH,
132};
133
134
135enum ActionType
93{ 136{
94 /** 137 /**
95 * The current main round. 138 * Do a set reconciliation with another peer (or via looback).
96 */ 139 */
97 enum ConsensusRound round; 140 ACTION_RECONCILE,
98 /** 141 /**
99 * The current exp round repetition, valid if 142 * Apply a referendum with a threshold
100 * the main round is an exp round. 143 * to a set and/or a diff.
101 */ 144 */
102 uint32_t exp_repetition; 145 ACTION_EVAL_RFN,
103 /** 146 /**
104 * The current exp subround, valid if 147 * Apply a diff to a set.
105 * the main round is an exp round.
106 */ 148 */
107 uint32_t exp_subround; 149 ACTION_APPLY_DIFF,
150 ACTION_FINISH,
151};
152
153enum SetKind
154{
155 SET_KIND_NONE = 0,
156 SET_KIND_CURRENT,
157 SET_KIND_LEADER,
158 SET_KIND_ECHO_RESULT,
108}; 159};
109 160
161enum DiffKind
162{
163 DIFF_KIND_NONE = 0,
164 DIFF_KIND_LEADER,
165 DIFF_KIND_GRADECAST_RESULT,
166};
110 167
111/** 168enum RfnKind
112 * A consensus session consists of one local client and the remote authorities. 169{
170 RFN_KIND_NONE = 0,
171 RFN_KIND_ECHO,
172 RFN_KIND_CONFIRM,
173};
174
175
176/*
177 * Node in the consensus task graph.
113 */ 178 */
114struct ConsensusSession 179struct TaskEntry
115{ 180{
181 struct TaskKey key;
182
183 struct Step *step;
184
185 int is_running;
186
187 int is_finished;
188
189 enum ActionType action;
190
191 struct SetKey input_set;
192 struct DiffKey input_diff;
193 struct RfnKey input_rfn;
194 struct SetKey output_set;
195 struct DiffKey output_diff;
196 struct RfnKey output_rfn;
197
116 /** 198 /**
117 * Consensus sessions are kept in a DLL. 199 * Threshold when evaluating referendums.
118 */ 200 */
119 struct ConsensusSession *next; 201 uint16_t threshold;
120 202
121 /** 203 /**
122 * Consensus sessions are kept in a DLL. 204 * Operation that is running for this task.
123 */ 205 */
124 struct ConsensusSession *prev; 206 struct GNUNET_SET_OperationHandle *op;
125 207
126 /** 208 struct GNUNET_SET_Handle *commited_set;
127 * Global consensus identification, computed 209};
128 * from the session id and participating authorities.
129 */
130 struct GNUNET_HashCode global_id;
131 210
211
212struct Step
213{
132 /** 214 /**
133 * Client that inhabits the session 215 * All steps of one session are in a
216 * linked list for easier deallocation.
134 */ 217 */
135 struct GNUNET_SERVER_Client *client; 218 struct Step *prev;
136 219
137 /** 220 /**
138 * Queued messages to the client. 221 * All steps of one session are in a
222 * linked list for easier deallocation.
139 */ 223 */
140 struct GNUNET_MQ_Handle *client_mq; 224 struct Step *next;
141 225
142 /** 226 struct ConsensusSession *session;
143 * Time when the conclusion of the consensus should begin. 227
228 struct TaskEntry **tasks;
229 unsigned int tasks_len;
230 unsigned int tasks_cap;
231
232 unsigned int finished_tasks;
233
234 /*
235 * Tasks that have this task as dependency.
236 *
237 * We store pointers to subordinates rather
238 * than to prerequisites since it makes
239 * tracking the readiness of a task easier.
144 */ 240 */
145 struct GNUNET_TIME_Absolute conclude_start; 241 struct Step **subordinates;
242 unsigned int subordinates_len;
243 unsigned int subordinates_cap;
146 244
147 /** 245 /**
148 * Timeout for all rounds together, single rounds will schedule a timeout task 246 * Counter for the prerequisites of
149 * with a fraction of the conclude timeout. 247 * this step.
150 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
151 */ 248 */
152 struct GNUNET_TIME_Absolute conclude_deadline; 249 size_t pending_prereq;
153 250
154 /** 251 /*
155 * Timeout task identifier for the current round or subround. 252 * Task that will run this step despite
253 * any pending prerequisites.
156 */ 254 */
157 struct GNUNET_SCHEDULER_Task * round_timeout_tid; 255 struct GNUNET_SCHEDULER_Task *timeout_task;
158 256
159 /** 257 unsigned int is_running;
160 * Number of other peers in the consensus. 258
259 unsigned int is_finished;
260
261 /*
262 * Round that this step should start.
263 * If not all prerequisites have run,
264 * the task will run anyway.
161 */ 265 */
162 unsigned int num_peers; 266 unsigned int start_round;
163 267
164 /** 268 /*
165 * Information about the other peers, 269 * Number of rounds this step occupies.
166 * their state, etc. 270 *
271 * Some steps are more expensive, and thus
272 * are allocated more rounds.
167 */ 273 */
168 struct ConsensusPeerInformation *info; 274 unsigned int num_rounds;
169 275
170 /** 276 /**
171 * Index of the local peer in the peers array 277 * Human-readable name for
278 * the task, used for debugging.
172 */ 279 */
173 unsigned int local_peer_idx; 280 char *debug_name;
281};
174 282
175 /** 283struct RfnPeerInfo
176 * Current round 284{
285 /* Peers can propose changes,
286 * but they are only accepted once
287 * the whole set operation is done. */
288 int is_commited;
289};
290
291struct RfnElementInfo
292{
293 struct GNUNET_SET_Element *element;
294
295 /*
296 * Vote (or VOTE_NONE) from every peer
297 * in the session about the element.
177 */ 298 */
178 enum ConsensusRound current_round; 299 int *votes;
300};
179 301
180 /** 302
181 * Permutation of peers for the current round, 303struct ReferendumEntry
304{
305 struct RfnKey key;
306
307 /*
308 * Elements where there is at least one proposed change.
309 *
310 * Maps the hash of the GNUNET_SET_Element
311 * to 'struct RfnElementInfo'.
182 */ 312 */
183 uint32_t *shuffle; 313 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
184 314
185 /** 315 /**
186 * Inverse permutation of peers for the current round, 316 * Stores, for every peer in the session,
317 * whether the peer finished the whole referendum.
318 *
319 * Votes from peers are only counted if they're
320 * marked as commited (#GNUNET_YES) in the referendum.
321 *
322 * Otherwise (#GNUNET_NO), the requested changes are
323 * not counted for majority votes or thresholds.
187 */ 324 */
188 uint32_t *shuffle_inv; 325 int *peer_commited;
326};
327
328
329struct DiffElementInfo
330{
331 struct GNUNET_SET_Element *element;
189 332
190 /** 333 /**
191 * Current round of the exponential scheme. 334 * Positive weight for 'add', negative
335 * weights for 'remove'.
192 */ 336 */
193 uint32_t exp_repetition; 337 int weight;
338};
339
340
341/**
342 * Weighted diff.
343 */
344struct DiffEntry
345{
346 struct DiffKey key;
347 struct GNUNET_CONTAINER_MultiHashMap *changes;
348};
349
194 350
351
352/**
353 * A consensus session consists of one local client and the remote authorities.
354 */
355struct ConsensusSession
356{
195 /** 357 /**
196 * Current sub-round of the exponential scheme. 358 * Consensus sessions are kept in a DLL.
197 */ 359 */
198 uint32_t exp_subround; 360 struct ConsensusSession *next;
199 361
200 /** 362 /**
201 * The partner for the current exp-round. 363 * Consensus sessions are kept in a DLL.
202 * The local peer will initiate the set reconciliation with the
203 * outgoing peer.
204 */ 364 */
205 struct ConsensusPeerInformation *partner_outgoing; 365 struct ConsensusSession *prev;
366
367 unsigned int num_client_insert_pending;
368
369 struct GNUNET_CONTAINER_MultiHashMap *setmap;
370 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
371 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
206 372
207 /** 373 /**
208 * The partner for the current exp-round 374 * Array of peers with length 'num_peers'.
209 * The incoming peer will initiate the set reconciliation with 375 */
210 * the incoming peer. 376 int *peers_ignored;
377
378 /*
379 * Mapping from (hashed) TaskKey to TaskEntry.
380 *
381 * We map the application_id for a round to the task that should be
382 * executed, so we don't have to go through all task whenever we get
383 * an incoming set op request.
211 */ 384 */
212 struct ConsensusPeerInformation *partner_incoming; 385 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
386
387 struct Step *steps_head;
388 struct Step *steps_tail;
389
390 int conclude_started;
391
392 int conclude_done;
213 393
214 /** 394 /**
215 * The consensus set of this session. 395 * Global consensus identification, computed
216 */ 396 * from the session id and participating authorities.
217 struct GNUNET_SET_Handle *element_set; 397 */
398 struct GNUNET_HashCode global_id;
218 399
219 /** 400 /**
220 * Listener for requests from other peers. 401 * Client that inhabits the session
221 * Uses the session's global id as app id.
222 */ 402 */
223 struct GNUNET_SET_ListenHandle *set_listener; 403 struct GNUNET_SERVER_Client *client;
224};
225
226 404
227/**
228 * Information about a peer that is in a consensus session.
229 */
230struct ConsensusPeerInformation
231{
232 /** 405 /**
233 * Peer identitty of the peer in the consensus session 406 * Queued messages to the client.
234 */ 407 */
235 struct GNUNET_PeerIdentity peer_id; 408 struct GNUNET_MQ_Handle *client_mq;
236 409
237 /** 410 /**
238 * Back-reference to the consensus session, 411 * Time when the conclusion of the consensus should begin.
239 * to that ConsensusPeerInformation can be used as a closure
240 */ 412 */
241 struct ConsensusSession *session; 413 struct GNUNET_TIME_Absolute conclude_start;
242 414
243 /** 415 /**
244 * Have we finished the set operation for this (sub-)round? 416 * Timeout for all rounds together, single rounds will schedule a timeout task
417 * with a fraction of the conclude timeout.
418 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
245 */ 419 */
246 int set_op_finished; 420 struct GNUNET_TIME_Absolute conclude_deadline;
421
422 struct GNUNET_PeerIdentity *peers;
247 423
248 /** 424 /**
249 * Set operation we are currently executing with this peer. 425 * Number of other peers in the consensus.
250 */ 426 */
251 struct GNUNET_SET_OperationHandle *set_op; 427 unsigned int num_peers;
252 428
253 /** 429 /**
254 * Set operation we are planning on executing with this peer. 430 * Index of the local peer in the peers array
255 */ 431 */
256 struct GNUNET_SET_OperationHandle *delayed_set_op; 432 unsigned int local_peer_idx;
257 433
258 /** 434 /**
259 * Info about the round of the delayed set operation. 435 * Listener for requests from other peers.
436 * Uses the session's global id as app id.
260 */ 437 */
261 struct RoundInfo delayed_round_info; 438 struct GNUNET_SET_ListenHandle *set_listener;
262}; 439};
263 440
264
265/** 441/**
266 * Linked list of sessions this peer participates in. 442 * Linked list of sessions this peer participates in.
267 */ 443 */
@@ -288,31 +464,123 @@ static struct GNUNET_SERVER_Handle *srv;
288static struct GNUNET_PeerIdentity my_peer; 464static struct GNUNET_PeerIdentity my_peer;
289 465
290 466
291/** 467static void
292 * Check if the current subround has finished. 468finish_task (struct TaskEntry *task);
293 * Must only be called when an exp-round is the current round. 469
294 * 470static void
295 * @param session session to check for exp-round completion 471run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task);
296 * @return GNUNET_YES if the subround has finished, 472
297 * GNUNET_NO if not 473static void
298 */ 474run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task);
299static int 475
300have_exp_subround_finished (const struct ConsensusSession *session) 476static void
301{ 477run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task);
302 int not_finished; 478
303 479static void
304 GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round); 480run_ready_steps (struct ConsensusSession *session);
305 481
306 not_finished = 0; 482static const char *
307 if ( (NULL != session->partner_outgoing) && 483phasename (uint16_t phase)
308 (GNUNET_NO == session->partner_outgoing->set_op_finished) ) 484{
309 not_finished++; 485 switch (phase)
310 if ( (NULL != session->partner_incoming) && 486 {
311 (GNUNET_NO == session->partner_incoming->set_op_finished) ) 487 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
312 not_finished++; 488 case PHASE_KIND_FINISH: return "FINISH";
313 if (0 == not_finished) 489 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
314 return GNUNET_YES; 490 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
315 return GNUNET_NO; 491 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
492 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
493 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
494 case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
495 default: return "(unknown)";
496 }
497}
498
499
500static const char *
501setname (uint16_t kind)
502{
503 switch (kind)
504 {
505 case SET_KIND_CURRENT: return "CURRENT";
506 case SET_KIND_LEADER: return "LEADER";
507 case SET_KIND_NONE: return "NONE";
508 default: return "(unknown)";
509 }
510}
511
512static const char *
513rfnname (uint16_t kind)
514{
515 switch (kind)
516 {
517 case RFN_KIND_NONE: return "NONE";
518 case RFN_KIND_ECHO: return "ECHO";
519 case RFN_KIND_CONFIRM: return "CONFIRM";
520 default: return "(unknown)";
521 }
522}
523
524static const char *
525diffname (uint16_t kind)
526{
527 switch (kind)
528 {
529 case DIFF_KIND_NONE: return "NONE";
530 case DIFF_KIND_LEADER: return "LEADER";
531 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
532 default: return "(unknown)";
533 }
534}
535
536static const char *
537debug_str_task_key (struct TaskKey *tk)
538{
539 static char buf[256];
540
541 snprintf (buf, sizeof (buf),
542 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
543 phasename (tk->kind), tk->peer1, tk->peer2,
544 tk->leader, tk->repetition);
545
546 return buf;
547}
548
549static const char *
550debug_str_diff_key (struct DiffKey *dk)
551{
552 static char buf[256];
553
554 snprintf (buf, sizeof (buf),
555 "DiffKey kind=%s, k1=%d, k2=%d",
556 diffname (dk->diff_kind), dk->k1, dk->k2);
557
558 return buf;
559}
560
561static const char *
562debug_str_set_key (struct SetKey *sk)
563{
564 static char buf[256];
565
566 snprintf (buf, sizeof (buf),
567 "SetKey kind=%s, k1=%d, k2=%d",
568 setname (sk->set_kind), sk->k1, sk->k2);
569
570 return buf;
571}
572
573
574static const char *
575debug_str_rfn_key (struct RfnKey *rk)
576{
577 static char buf[256];
578
579 snprintf (buf, sizeof (buf),
580 "RfnKey kind=%s, k1=%d, k2=%d",
581 rfnname (rk->rfn_kind), rk->k1, rk->k2);
582
583 return buf;
316} 584}
317 585
318 586
@@ -325,11 +593,6 @@ static void
325destroy_session (struct ConsensusSession *session) 593destroy_session (struct ConsensusSession *session)
326{ 594{
327 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); 595 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
328 if (NULL != session->element_set)
329 {
330 GNUNET_SET_destroy (session->element_set);
331 session->element_set = NULL;
332 }
333 if (NULL != session->set_listener) 596 if (NULL != session->set_listener)
334 { 597 {
335 GNUNET_SET_listen_cancel (session->set_listener); 598 GNUNET_SET_listen_cancel (session->set_listener);
@@ -345,38 +608,13 @@ destroy_session (struct ConsensusSession *session)
345 GNUNET_SERVER_client_disconnect (session->client); 608 GNUNET_SERVER_client_disconnect (session->client);
346 session->client = NULL; 609 session->client = NULL;
347 } 610 }
348 if (NULL != session->shuffle)
349 {
350 GNUNET_free (session->shuffle);
351 session->shuffle = NULL;
352 }
353 if (NULL != session->shuffle_inv)
354 {
355 GNUNET_free (session->shuffle_inv);
356 session->shuffle_inv = NULL;
357 }
358 if (NULL != session->info)
359 {
360 int i;
361 for (i = 0; i < session->num_peers; i++)
362 {
363 struct ConsensusPeerInformation *cpi;
364 cpi = &session->info[i];
365 if (NULL != cpi->set_op)
366 {
367 GNUNET_SET_operation_cancel (cpi->set_op);
368 cpi->set_op = NULL;
369 }
370 }
371 GNUNET_free (session->info);
372 session->info = NULL;
373 }
374 GNUNET_free (session); 611 GNUNET_free (session);
375} 612}
376 613
377 614
378/** 615/**
379 * Iterator for set elements. [FIXME: bad comment] 616 * Send the final result set of the consensus to the client, element by
617 * element.
380 * 618 *
381 * @param cls closure 619 * @param cls closure
382 * @param element the current element, NULL if all elements have been 620 * @param element the current element, NULL if all elements have been
@@ -387,7 +625,8 @@ static int
387send_to_client_iter (void *cls, 625send_to_client_iter (void *cls,
388 const struct GNUNET_SET_Element *element) 626 const struct GNUNET_SET_Element *element)
389{ 627{
390 struct ConsensusSession *session = cls; 628 struct TaskEntry *task = (struct TaskEntry *) cls;
629 struct ConsensusSession *session = task->step->session;
391 struct GNUNET_MQ_Envelope *ev; 630 struct GNUNET_MQ_Envelope *ev;
392 631
393 if (NULL != element) 632 if (NULL != element)
@@ -417,185 +656,109 @@ send_to_client_iter (void *cls,
417 656
418 657
419/** 658/**
420 * Start the next round. 659 * Callback for set operation results. Called for each element
421 * This function can be invoked as a timeout task, or called manually (tc will be NULL then). 660 * in the result set.
422 * 661 *
423 * @param cls the session 662 * @param cls closure
424 * @param tc task context, for when this task is invoked by the scheduler, 663 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
425 * NULL if invoked for another reason 664 * @param status see enum GNUNET_SET_Status
426 */ 665 */
427static void 666static void
428round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 667set_result_cb_loop (void *cls,
668 const struct GNUNET_SET_Element *element,
669 enum GNUNET_SET_Status status)
429{ 670{
430 struct ConsensusSession *session; 671 /* Nothing to do here.
431 unsigned int i; 672 This is the callback for looped local set operations, everything is
432 int res; 673 handled by the first callback */
674
675 struct TaskEntry *task = cls;
676 struct ConsensusSession *session = task->step->session;
677
678 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
679 "P%u: skipping looped set result for {%s}, status %u\n",
680 session->local_peer_idx,
681 debug_str_task_key (&task->key),
682 status);
683}
433 684
434 /* don't kick off next round if we're shutting down */
435 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
436 return;
437 685
438 session = cls; 686static struct SetEntry *
439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx); 687lookup_set (struct ConsensusSession *session, struct SetKey *key)
688{
689 struct GNUNET_HashCode hash;
440 690
441 if (tc != NULL) 691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
442 session->round_timeout_tid = NULL; 692 "P%u: looking up set {%s}\n",
693 session->local_peer_idx,
694 debug_str_set_key (key));
443 695
444 if (session->round_timeout_tid != NULL) 696 GNUNET_assert (SET_KIND_NONE != key->set_kind);
445 { 697 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
446 GNUNET_SCHEDULER_cancel (session->round_timeout_tid); 698 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
447 session->round_timeout_tid = NULL; 699}
448 }
449 700
450 for (i = 0; i < session->num_peers; i++)
451 {
452 if (NULL != session->info[i].set_op)
453 {
454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n",
455 session->local_peer_idx, i);
456 GNUNET_SET_operation_cancel (session->info[i].set_op);
457 session->info[i].set_op = NULL;
458 }
459 /* we're in the new round, nothing finished yet */
460 session->info[i].set_op_finished = GNUNET_NO;
461 }
462 701
463 switch (session->current_round) 702static struct DiffEntry *
464 { 703lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
465 case CONSENSUS_ROUND_BEGIN: 704{
466 session->current_round = CONSENSUS_ROUND_EXCHANGE; 705 struct GNUNET_HashCode hash;
467 session->exp_repetition = 0; 706
468 subround_over (session, NULL); 707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
469 break; 708 "P%u: looking up diff {%s}\n",
470 case CONSENSUS_ROUND_EXCHANGE: 709 session->local_peer_idx,
471 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n", 710 debug_str_diff_key (key));
472 session->local_peer_idx); 711
473 session->current_round = CONSENSUS_ROUND_FINISH; 712 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
474 res = GNUNET_SET_iterate (session->element_set, send_to_client_iter, session); 713 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
475 if (GNUNET_SYSERR == res) 714 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
476 {
477 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: set invalid\n");
478 }
479 else if (GNUNET_NO == res)
480 {
481 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: iterator already active\n");
482 }
483 break;
484 default:
485 GNUNET_assert (0);
486 }
487} 715}
488 716
489 717
490/** 718static struct ReferendumEntry *
491 * Create a new permutation for the session's peers in session->shuffle. 719lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
492 * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
493 * both the global session id and the current round index.
494 *
495 * @param session the session to create the new permutation for
496 */
497static void
498shuffle (struct ConsensusSession *session)
499{ 720{
500 uint32_t i; 721 struct GNUNET_HashCode hash;
501 uint32_t randomness[session->num_peers-1];
502
503 if (NULL == session->shuffle)
504 session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
505 if (NULL == session->shuffle_inv)
506 session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
507 722
508 GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), 723 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509 &session->exp_repetition, sizeof (uint32_t), 724 "P%u: looking up rfn {%s}\n",
510 &session->global_id, sizeof (struct GNUNET_HashCode), 725 session->local_peer_idx,
511 NULL); 726 debug_str_rfn_key (key));
512 727
513 for (i = 0; i < session->num_peers; i++) 728 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
514 session->shuffle[i] = i; 729 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
730 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
731}
515 732
516 for (i = session->num_peers - 1; i > 0; i--)
517 {
518 uint32_t x;
519 uint32_t tmp;
520 x = randomness[i-1] % session->num_peers;
521 tmp = session->shuffle[x];
522 session->shuffle[x] = session->shuffle[i];
523 session->shuffle[i] = tmp;
524 }
525 733
526 /* create the inverse */ 734static void
527 for (i = 0; i < session->num_peers; i++) 735diff_insert (struct DiffEntry *diff,
528 session->shuffle_inv[session->shuffle[i]] = i; 736 int weight,
737 const struct GNUNET_SET_Element *element)
738{
739 GNUNET_assert (0);
529} 740}
530 741
531 742
532/**
533 * Find and set the partner_incoming and partner_outgoing of our peer,
534 * one of them may not exist (and thus set to NULL) if the number of peers
535 * in the session is not a power of two.
536 *
537 * @param session the consensus session
538 */
539static void 743static void
540find_partners (struct ConsensusSession *session) 744rfn_vote (struct ReferendumEntry *rfn,
541{ 745 uint16_t voting_peer,
542 unsigned int arc; 746 uint16_t num_peers,
543 unsigned int num_ghosts; 747 int vote,
544 unsigned int largest_arc; 748 const struct GNUNET_SET_Element *element)
545 int partner_idx; 749{
546 750 GNUNET_assert (voting_peer < num_peers);
547 /* shuffled local index */ 751 GNUNET_assert (0);
548 int my_idx = session->shuffle[session->local_peer_idx];
549
550 /* distance to neighboring peer in current subround */
551 arc = 1 << session->exp_subround;
552 largest_arc = 1;
553 while (largest_arc < session->num_peers)
554 largest_arc <<= 1;
555 num_ghosts = largest_arc - session->num_peers;
556 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
557 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
558 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
559
560 if (0 == (my_idx & arc))
561 {
562 /* we are outgoing */
563 partner_idx = (my_idx + arc) % session->num_peers;
564 session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
565 GNUNET_assert (GNUNET_NO == session->partner_outgoing->set_op_finished);
566 /* are we a 'ghost' of a peer that would exist if
567 * the number of peers was a power of two, and thus have to partner
568 * with an additional peer?
569 */
570 if (my_idx < num_ghosts)
571 {
572 int ghost_partner_idx;
573 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
574 ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
575 /* platform dependent; modulo sometimes returns negative values */
576 if (ghost_partner_idx < 0)
577 ghost_partner_idx += session->num_peers;
578 /* we only need to have a ghost partner if the partner is outgoing */
579 if (0 == (ghost_partner_idx & arc))
580 {
581 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
582 session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
583 GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished);
584 return;
585 }
586 }
587 session->partner_incoming = NULL;
588 return;
589 }
590 /* we only have an incoming connection */
591 partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
592 if (partner_idx < 0)
593 partner_idx += session->num_peers;
594 session->partner_outgoing = NULL;
595 session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
596 GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished);
597} 752}
598 753
754uint16_t
755task_other_peer (struct TaskEntry *task)
756{
757 uint16_t me = task->step->session->local_peer_idx;
758 if (task->key.peer1 == me)
759 return task->key.peer2;
760 return task->key.peer1;
761}
599 762
600/** 763/**
601 * Callback for set operation results. Called for each element 764 * Callback for set operation results. Called for each element
@@ -610,267 +773,775 @@ set_result_cb (void *cls,
610 const struct GNUNET_SET_Element *element, 773 const struct GNUNET_SET_Element *element,
611 enum GNUNET_SET_Status status) 774 enum GNUNET_SET_Status status)
612{ 775{
613 struct ConsensusPeerInformation *cpi = cls; 776 struct TaskEntry *task = cls;
614 unsigned int remote_idx = cpi - cpi->session->info; 777 struct ConsensusSession *session = task->step->session;
615 unsigned int local_idx = cpi->session->local_peer_idx; 778 struct SetEntry *output_set = NULL;
779 struct DiffEntry *output_diff = NULL;
780 struct ReferendumEntry *output_rfn = NULL;
781 unsigned int other_idx;
782
783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784 "P%u: got set result for {%s}, status %u\n",
785 session->local_peer_idx,
786 debug_str_task_key (&task->key),
787 status);
788
789 if (GNUNET_NO == task->is_running)
790 {
791 GNUNET_break_op (0);
792 return;
793 }
616 794
617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u with status %u\n", 795 if (GNUNET_YES == task->is_finished)
618 local_idx, remote_idx, (unsigned int) status); 796 {
797 GNUNET_break_op (0);
798 return;
799 }
800
801 if (task->key.peer1 == session->local_peer_idx)
802 other_idx = task->key.peer2;
803 else if (task->key.peer2 == session->local_peer_idx)
804 other_idx = task->key.peer1;
805 else
806 {
807 /* error in task graph construction */
808 GNUNET_assert (0);
809 }
619 810
620 GNUNET_assert ((cpi == cpi->session->partner_outgoing) || 811 if (SET_KIND_NONE != task->output_set.set_kind)
621 (cpi == cpi->session->partner_incoming)); 812 output_set = lookup_set (session, &task->output_set);
813
814 if (DIFF_KIND_NONE != task->output_diff.diff_kind)
815 output_diff = lookup_diff (session, &task->output_diff);
816
817 if (RFN_KIND_NONE != task->output_rfn.rfn_kind)
818 output_rfn = lookup_rfn (session, &task->output_rfn);
819
820 if (GNUNET_YES == session->peers_ignored[other_idx])
821 {
822 /* We should have never started or commited to an operation
823 with an ignored peer. */
824 GNUNET_break (0);
825 return;
826 }
622 827
623 switch (status) 828 switch (status)
624 { 829 {
830 // case GNUNET_SET_STATUS_MISSING_LOCAL:
625 case GNUNET_SET_STATUS_OK: 831 case GNUNET_SET_STATUS_OK:
626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n", 832 if (NULL != output_set)
627 local_idx, remote_idx);
628 break;
629 case GNUNET_SET_STATUS_FAILURE:
630 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
631 local_idx, remote_idx);
632 cpi->set_op = NULL;
633 return;
634 case GNUNET_SET_STATUS_HALF_DONE:
635 case GNUNET_SET_STATUS_DONE:
636 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
637 local_idx, remote_idx);
638 cpi->set_op_finished = GNUNET_YES;
639 cpi->set_op = NULL;
640 if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
641 { 833 {
642 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n", 834 // FIXME: record pending adds, use callback
643 local_idx); 835 GNUNET_SET_add_element (output_set->h,
644 subround_over (cpi->session, NULL); 836 element,
837 NULL,
838 NULL);
839
645 } 840 }
646 else 841 if (NULL != output_diff)
647 { 842 {
648 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n", 843 diff_insert (output_diff, 1, element);
649 local_idx);
650 } 844 }
651 return; 845 if (NULL != output_rfn)
652 default: 846 {
653 GNUNET_break (0); 847 rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element);
654 return; 848 }
655 } 849 // XXX: add result to structures in task
656
657 switch (cpi->session->current_round)
658 {
659 case CONSENSUS_ROUND_COMPLETION:
660 case CONSENSUS_ROUND_EXCHANGE:
661 GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
662 break; 850 break;
663 default: 851 //case GNUNET_SET_STATUS_MISSING_REMOTE:
852 // // XXX: add result to structures in task
853 // break;
854 case GNUNET_SET_STATUS_DONE:
855 // XXX: check first if any changes to the underlying
856 // set are still pending
857 // XXX: commit other peer in referendum
858 finish_task (task);
859 break;
860 case GNUNET_SET_STATUS_FAILURE:
861 // XXX: cleanup
664 GNUNET_break (0); 862 GNUNET_break (0);
665 return; 863 return;
864 default:
865 /* not reached */
866 GNUNET_assert (0);
666 } 867 }
667} 868}
668 869
669 870
871
670/** 872/**
671 * Compare the round the session is in with the round of the given context message. 873 * Commit the appropriate set for a
672 * 874 * task.
673 * @param session a consensus session
674 * @param ri a round context message
675 * @return 0 if it's the same round, -1 if the session is in an earlier round,
676 * 1 if the session is in a later round
677 */ 875 */
678static int 876static void
679rounds_compare (struct ConsensusSession *session, 877commit_set (struct ConsensusSession *session,
680 struct RoundInfo* ri) 878 struct TaskEntry *task)
681{ 879{
682 if (session->current_round < ri->round) 880 struct SetEntry *set;
683 return -1; 881
684 if (session->current_round > ri->round) 882 GNUNET_assert (NULL != task->op);
685 return 1; 883 set = lookup_set (session, &task->input_set);
686 if (session->current_round == CONSENSUS_ROUND_EXCHANGE) 884 GNUNET_assert (NULL != set);
687 { 885 GNUNET_SET_commit (task->op, set->h);
688 if (session->exp_repetition < ri->exp_repetition)
689 return -1;
690 if (session->exp_repetition > ri->exp_repetition)
691 return 1;
692 if (session->exp_subround < ri->exp_subround)
693 return -1;
694 if (session->exp_subround > ri->exp_subround)
695 return 1;
696 return 0;
697 }
698 /* other rounds have no subrounds / repetitions to compare */
699 return 0;
700} 886}
701 887
702 888
703/**
704 * Do the next subround in the exp-scheme.
705 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
706 *
707 * @param cls the session
708 * @param tc task context, for when this task is invoked by the scheduler,
709 * NULL if invoked for another reason
710 */
711static void 889static void
712subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 890put_diff (struct ConsensusSession *session,
891 struct DiffEntry *diff)
713{ 892{
714 struct ConsensusSession *session; 893 struct GNUNET_HashCode hash;
715 struct GNUNET_TIME_Relative subround_timeout;
716 int i;
717 894
718 /* don't kick off next subround if we're shutting down */ 895 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
719 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 896 GNUNET_assert (GNUNET_OK ==
720 return; 897 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
898 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
899}
900
901static void
902put_set (struct ConsensusSession *session,
903 struct SetEntry *set)
904{
905 struct GNUNET_HashCode hash;
721 906
722 session = cls; 907 GNUNET_assert (NULL != set->h);
908
909 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
910 GNUNET_assert (GNUNET_OK ==
911 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
912 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
913}
914
915
916static void
917put_rfn (struct ConsensusSession *session,
918 struct ReferendumEntry *rfn)
919{
920 struct GNUNET_HashCode hash;
921
922 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
923 GNUNET_assert (GNUNET_OK ==
924 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
925 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
926}
927
928
929
930static void
931output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
932{
933 struct TaskEntry *task = (struct TaskEntry *) cls;
934 struct ConsensusSession *session = task->step->session;
935 struct SetEntry *set = GNUNET_new (struct SetEntry);
936
937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938 "P%u: Received lazy copy, storing output set %s\n",
939 session->local_peer_idx, debug_str_set_key (&task->output_set));
940
941 set->key = task->output_set;
942 set->h = copy;
943 put_set (task->step->session, set);
944 run_task_remote_union (task->step->session, task);
945}
946
947
948static void
949run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
950{
951 struct SetEntry *input;
723 952
724 GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round); 953 input = lookup_set (session, &task->input_set);
954 GNUNET_assert (NULL != input);
955 GNUNET_assert (NULL != input->h);
725 956
726 if (tc != NULL) 957 /* We create the outputs for the operation here
958 (rather than in the set operation callback)
959 because we want something valid in there, even
960 if the other peer doesn't talk to us */
961
962 if (SET_KIND_NONE != task->output_set.set_kind)
727 { 963 {
728 session->round_timeout_tid = NULL; 964 /* If we don't have an existing output set,
729 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "P%u: consensus subround timed out\n", 965 we clone the input set. */
730 session->local_peer_idx); 966 if (NULL == lookup_set (session, &task->output_set))
967 {
968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969 "Output set missing, copying from input set\n");
970 /* Since the cloning is asynchronous,
971 we'll retry the current function once the copy
972 has been provided by the SET service. */
973 GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task);
974 return;
975 }
731 } 976 }
732 977
733 /* cancel timeout */ 978 if (RFN_KIND_NONE != task->output_rfn.rfn_kind)
734 if (session->round_timeout_tid != NULL)
735 { 979 {
736 GNUNET_SCHEDULER_cancel (session->round_timeout_tid); 980 if (NULL == lookup_rfn (session, &task->output_rfn))
737 session->round_timeout_tid = NULL; 981 {
982 struct ReferendumEntry *rfn;
983
984 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985 "P%u: output rfn <%s> missing, creating.\n",
986 session->local_peer_idx,
987 debug_str_rfn_key (&task->output_rfn));
988
989 rfn = GNUNET_new (struct ReferendumEntry);
990 rfn->key = task->output_rfn;
991 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
992 rfn->peer_commited = GNUNET_new_array (session->num_peers, int);
993 put_rfn (session, rfn);
994 }
738 } 995 }
739 996
740 for (i = 0; i < session->num_peers; i++) 997 if (task->key.peer1 == session->local_peer_idx)
741 { 998 {
742 if (NULL != session->info[i].set_op) 999 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
1000
1001 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002 "P%u: Looking up set {%s} to run remote union\n",
1003 session->local_peer_idx,
1004 debug_str_set_key (&task->input_set));
1005
1006 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
1007 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
1008
1009 rcm.kind = htons (task->key.kind);
1010 rcm.peer1 = htons (task->key.peer1);
1011 rcm.peer2 = htons (task->key.peer2);
1012 rcm.leader = htons (task->key.leader);
1013 rcm.repetition = htons (task->key.repetition);
1014
1015 GNUNET_assert (NULL == task->op);
1016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
1017 session->local_peer_idx, task->key.peer2, debug_str_set_key (&task->input_set));
1018
1019 // XXX: maybe this should be done while
1020 // setting up tasks alreays?
1021 task->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
1022 &session->global_id,
1023 &rcm.header,
1024 GNUNET_SET_RESULT_ADDED, /* XXX: will be obsolete soon */
1025 set_result_cb,
1026 task);
1027
1028 /* Referendums must be materialized as a set before */
1029 GNUNET_assert (RFN_KIND_NONE == task->input_rfn.rfn_kind);
1030
1031 if (GNUNET_OK != GNUNET_SET_commit (task->op, input->h))
743 { 1032 {
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n", 1033 GNUNET_break (0);
745 session->local_peer_idx, i); 1034 /* XXX: cleanup? */
746 GNUNET_SET_operation_cancel (session->info[i].set_op); 1035 return;
747 session->info[i].set_op = NULL;
748 } 1036 }
749 /* we're in the new round, nothing finished yet */
750 session->info[i].set_op_finished = GNUNET_NO;
751 } 1037 }
1038 else if (task->key.peer2 == session->local_peer_idx)
1039 {
1040 /* Wait for the other peer to contact us */
1041 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
1042 session->local_peer_idx, task->key.peer1);
752 1043
753 if (session->exp_repetition >= NUM_EXP_REPETITIONS) 1044 if (NULL != task->op)
1045 {
1046 GNUNET_assert (NULL == task->commited_set);
1047 commit_set (session, task);
1048 }
1049 }
1050 else
754 { 1051 {
755 round_over (session, NULL); 1052 /* We made an error while constructing the task graph. */
756 return; 1053 GNUNET_assert (0);
1054 }
1055}
1056
1057
1058static int
1059rfn_majority (uint16_t num_peers,
1060 struct ReferendumEntry *rfn,
1061 struct RfnElementInfo *ri,
1062 uint16_t threshold)
1063{
1064 unsigned int votes_add = 0;
1065 unsigned int votes_remove = 0;
1066 unsigned int num_commited = 0;
1067 unsigned int maj_thresh;
1068 unsigned int nv;
1069 unsigned int tv;
1070 unsigned int i;
1071
1072 for (i = 0; i < num_peers; i++)
1073 {
1074 if (GNUNET_NO == rfn->peer_commited[i])
1075 continue;
1076 num_commited++;
1077 if (ri->votes[i] == VOTE_ADD)
1078 votes_add++;
1079 if (ri->votes[i] == VOTE_REMOVE)
1080 votes_remove++;
757 } 1081 }
758 1082
759 if (session->exp_repetition == 0) 1083 /* Threshold to reach a majority among
1084 submitted votes, may not be enough for the
1085 global threshold. */
1086 maj_thresh = (num_commited + 1) / 2;
1087 /* Vote are relative to our local set, so it can only be
1088 either all add or all remove */
1089 GNUNET_assert ( (0 == votes_add) || (0 == votes_remove) );
1090
1091 if (votes_add > 0)
760 { 1092 {
761 /* initialize everything for the log-rounds */ 1093 nv = votes_add;
762 session->exp_repetition = 1; 1094 tv = VOTE_ADD;
763 session->exp_subround = 0;
764 if (NULL == session->shuffle)
765 session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
766 if (NULL == session->shuffle_inv)
767 session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
768 for (i = 0; i < session->num_peers; i++)
769 session->shuffle[i] = session->shuffle_inv[i] = i;
770 } 1095 }
771 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) 1096 else if (votes_remove > 0)
772 { 1097 {
773 /* subrounds done, start new log-round */ 1098 nv = votes_remove;
774 session->exp_repetition++; 1099 tv = VOTE_REMOVE;
775 session->exp_subround = 0;
776 shuffle (session);
777 } 1100 }
778 else 1101 else
779 { 1102 {
780 session->exp_subround++; 1103 nv = 0;
1104 tv = VOTE_NONE;
781 } 1105 }
782 1106
783 subround_timeout = 1107 if ( (nv >= maj_thresh) && (nv >= threshold) )
784 GNUNET_TIME_relative_divide (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline), 1108 return tv;
785 2 * NUM_EXP_REPETITIONS * ((int) ceil (log2 (session->num_peers))));
786 1109
787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "subround timeout: %u ms\n", subround_timeout.rel_value_us / 1000); 1110 if ( ((num_commited - nv) >= maj_thresh) && ((num_commited - nv) >= threshold) )
1111 return VOTE_NONE;
788 1112
789 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (subround_timeout, subround_over, session); 1113 return VOTE_CONTESTED;
1114}
790 1115
791 /* determine the incoming and outgoing partner */
792 find_partners (session);
793 1116
794 GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]); 1117struct SetChangeProgressCls
795 GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]); 1118{
1119 int num_pending;
1120 struct TaskEntry *task;
1121};
1122
1123
1124static void
1125eval_rfn_done (struct TaskEntry *task)
1126{
1127 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1128 "P%u: EVAL_REFERENDUM done for task {%s}\n",
1129 task->step->session->local_peer_idx, debug_str_task_key (&task->key));
1130
1131 finish_task (task);
1132}
796 1133
797 /* initiate set operation with the outgoing partner */ 1134
798 if (NULL != session->partner_outgoing) 1135static void
1136eval_rfn_progress (void *cls)
1137{
1138 struct SetChangeProgressCls *erc = cls;
1139
1140 GNUNET_assert (erc->num_pending > 0);
1141
1142 erc->num_pending--;
1143
1144 if (0 == erc->num_pending)
799 { 1145 {
800 struct GNUNET_CONSENSUS_RoundContextMessage *msg; 1146 struct TaskEntry *task = erc->task;
801 msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage); 1147 GNUNET_free (erc);
802 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); 1148 eval_rfn_done (task);
803 msg->header.size = htons (sizeof *msg); 1149 }
804 msg->round = htonl (session->current_round); 1150}
805 msg->exp_repetition = htonl (session->exp_repetition); 1151
806 msg->exp_subround = htonl (session->exp_subround); 1152
1153static void
1154eval_rfn_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1155{
1156 struct TaskEntry *task = (struct TaskEntry *) cls;
1157 struct ConsensusSession *session = task->step->session;
1158 struct SetEntry *set;
807 1159
808 if (NULL != session->partner_outgoing->set_op) 1160 set = GNUNET_new (struct SetEntry);
1161 set->h = copy;
1162 set->key = task->output_set;
1163
1164 put_set (session, set);
1165
1166 run_task_eval_rfn (session, task);
1167}
1168
1169
1170/**
1171 * Take an input set and an input referendum,
1172 * apply the referendum with a threshold to the input
1173 * set and store the result in the output set and/or output diff.
1174 */
1175static void
1176run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
1177{
1178 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1179 struct ReferendumEntry *input_rfn;
1180 struct RfnElementInfo *ri;
1181 struct SetEntry *output_set = NULL;
1182 struct DiffEntry *output_diff = NULL;
1183 struct SetChangeProgressCls *progress_cls;
1184
1185 /* Have at least one output */
1186 GNUNET_assert ( (task->output_set.set_kind != SET_KIND_NONE) ||
1187 (task->output_diff.diff_kind != DIFF_KIND_NONE));
1188
1189 /* Not allowed as output */
1190 GNUNET_assert ( (task->output_rfn.rfn_kind == RFN_KIND_NONE));
1191
1192 if (SET_KIND_NONE != task->output_set.set_kind)
1193 {
1194 /* We have a set output, thus the output set must
1195 exist or copy it from the input set */
1196 output_set = lookup_set (session, &task->output_set);
1197 if (NULL == output_set)
809 { 1198 {
810 GNUNET_break (0); 1199 struct SetEntry *input_set;
811 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); 1200
1201 input_set = lookup_set (session, &task->input_set);
1202 GNUNET_assert (NULL != input_set);
1203 GNUNET_SET_copy_lazy (input_set->h,
1204 eval_rfn_copy_cb,
1205 task);
1206 /* We'll be called again, this time with the
1207 set ready. */
1208 return;
812 } 1209 }
813 session->partner_outgoing->set_op = 1210 }
814 GNUNET_SET_prepare (&session->partner_outgoing->peer_id, 1211
815 &session->global_id, 1212 if (DIFF_KIND_NONE != task->output_diff.diff_kind)
816 (struct GNUNET_MessageHeader *) msg, 1213 {
817 GNUNET_SET_RESULT_ADDED, 1214 output_diff = lookup_diff (session, &task->output_diff);
818 set_result_cb, session->partner_outgoing); 1215 if (NULL == output_diff)
819 GNUNET_free (msg);
820 if (GNUNET_OK != GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set))
821 { 1216 {
822 GNUNET_break (0); 1217 output_diff = GNUNET_new (struct DiffEntry);
823 session->partner_outgoing->set_op = NULL; 1218 output_diff->key = task->output_diff;
824 session->partner_outgoing->set_op_finished = GNUNET_YES; 1219 output_diff->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1220 put_diff (session, output_diff);
825 } 1221 }
826 } 1222 }
827 1223
828 /* commit to the delayed set operation */ 1224 progress_cls = GNUNET_new (struct SetChangeProgressCls);
829 if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op)) 1225
830 { 1226 input_rfn = lookup_rfn (session, &task->input_rfn);
831 int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info); 1227
1228 GNUNET_assert (NULL != input_rfn);
832 1229
833 if (NULL != session->partner_incoming->set_op) 1230 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1231 GNUNET_assert (NULL != iter);
1232
1233 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1234 {
1235 int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, task->threshold);
1236 switch (majority_vote)
834 { 1237 {
835 GNUNET_break (0); 1238 case VOTE_ADD:
836 GNUNET_SET_operation_cancel (session->partner_incoming->set_op); 1239 if (NULL != output_set)
837 session->partner_incoming->set_op = NULL; 1240 {
1241 progress_cls->num_pending++;
1242 GNUNET_assert (GNUNET_OK ==
1243 GNUNET_SET_add_element (output_set->h,
1244 ri->element,
1245 eval_rfn_progress,
1246 progress_cls));
1247 }
1248 if (NULL != output_diff)
1249 {
1250 diff_insert (output_diff, 1, ri->element);
1251 }
1252 break;
1253 case VOTE_CONTESTED:
1254 if (NULL != output_set)
1255 output_set->is_contested = GNUNET_YES;
1256 /* fallthrough */
1257 case VOTE_REMOVE:
1258 if (NULL != output_set)
1259 {
1260 progress_cls->num_pending++;
1261 GNUNET_assert (GNUNET_OK ==
1262 GNUNET_SET_remove_element (output_set->h,
1263 ri->element,
1264 eval_rfn_progress,
1265 progress_cls));
1266 }
1267 if (NULL != output_diff)
1268 {
1269 diff_insert (output_diff, -1, ri->element);
1270 }
1271 break;
1272 case VOTE_NONE:
1273 /* Nothing to do. */
1274 break;
1275 default:
1276 /* not reached */
1277 GNUNET_assert (0);
838 } 1278 }
839 if (cmp == 0) 1279 }
1280 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1281
1282 if (progress_cls->num_pending == 0)
1283 {
1284 // call closure right now, no pending ops
1285 GNUNET_free (progress_cls);
1286 eval_rfn_done (task);
1287 }
1288}
1289
1290
1291static void
1292apply_diff_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1293{
1294 struct TaskEntry *task = (struct TaskEntry *) cls;
1295 struct ConsensusSession *session = task->step->session;
1296 struct SetEntry *set;
1297
1298 set = GNUNET_new (struct SetEntry);
1299 set->h = copy;
1300 set->key = task->output_set;
1301
1302 put_set (session, set);
1303
1304 run_task_apply_diff (session, task);
1305}
1306
1307
1308static void
1309apply_diff_done (struct TaskEntry *task)
1310{
1311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1312 "P%u: APPLY_DIFF done for task {%s}\n",
1313 task->step->session->local_peer_idx, debug_str_task_key (&task->key));
1314 finish_task (task);
1315}
1316
1317
1318static void
1319apply_diff_progress (void *cls)
1320{
1321 struct SetChangeProgressCls *erc = cls;
1322
1323 GNUNET_assert (erc->num_pending > 0);
1324
1325 erc->num_pending--;
1326
1327 if (0 == erc->num_pending)
1328 {
1329 struct TaskEntry *task = erc->task;
1330 GNUNET_free (erc);
1331 apply_diff_done (task);
1332 }
1333}
1334
1335
1336static void
1337run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task)
1338{
1339 struct SetEntry *output_set;
1340 struct DiffEntry *input_diff;
1341 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1342 struct DiffElementInfo *di;
1343 struct SetChangeProgressCls *progress_cls;
1344
1345 GNUNET_assert (task->output_set.set_kind != SET_KIND_NONE);
1346 GNUNET_assert (task->input_diff.diff_kind != DIFF_KIND_NONE);
1347
1348 input_diff = lookup_diff (session, &task->input_diff);
1349
1350 GNUNET_assert (NULL != input_diff);
1351
1352 output_set = lookup_set (session, &task->output_set);
1353
1354 if (NULL == output_set)
1355 {
1356 struct SetEntry *input_set;
1357
1358 input_set = lookup_set (session, &task->input_set);
1359 GNUNET_assert (NULL != input_set);
1360 GNUNET_SET_copy_lazy (input_set->h,
1361 apply_diff_copy_cb,
1362 task);
1363 /* We'll be called again, this time with the
1364 set ready. */
1365 return;
1366 }
1367
1368 progress_cls = GNUNET_new (struct SetChangeProgressCls);
1369
1370 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_diff->changes);
1371
1372 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1373 {
1374 if (di->weight > 0)
840 { 1375 {
841 if (GNUNET_OK != GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set)) 1376 progress_cls->num_pending++;
842 { 1377 GNUNET_assert (GNUNET_OK ==
843 GNUNET_break (0); 1378 GNUNET_SET_remove_element (output_set->h,
844 } 1379 di->element,
845 session->partner_incoming->set_op = session->partner_incoming->delayed_set_op; 1380 apply_diff_progress,
846 session->partner_incoming->delayed_set_op = NULL; 1381 progress_cls));
847 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
848 session->local_peer_idx, (int) (session->partner_incoming - session->info));
849 } 1382 }
850 else 1383 else if (di->weight < 0)
851 { 1384 {
852 /* this should not happen -- a round has been skipped! */ 1385 progress_cls->num_pending++;
853 GNUNET_break_op (0); 1386 GNUNET_assert (GNUNET_OK ==
1387 GNUNET_SET_add_element (output_set->h,
1388 di->element,
1389 apply_diff_progress,
1390 progress_cls));
854 } 1391 }
855 } 1392 }
856 1393
1394 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1395
1396 if (progress_cls->num_pending == 0)
1397 {
1398 // call closure right now, no pending ops
1399 GNUNET_free (progress_cls);
1400 apply_diff_done (task);
1401 }
1402}
1403
1404
1405static void
1406run_task_finish (struct ConsensusSession *session, struct TaskEntry *task)
1407{
1408 struct SetEntry *final_set;
1409
1410 final_set = lookup_set (session, &task->input_set);
1411
1412 GNUNET_assert (NULL != final_set);
1413
1414
1415 GNUNET_SET_iterate (final_set->h,
1416 send_to_client_iter,
1417 task);
1418}
1419
1420static void
1421run_task (struct ConsensusSession *session, struct TaskEntry *task)
1422{
1423 GNUNET_assert (GNUNET_NO == task->is_running);
1424 GNUNET_assert (GNUNET_NO == task->is_finished);
1425
1426
1427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
1428
1429 switch (task->action)
1430 {
1431 case ACTION_RECONCILE:
1432 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_RECONCILE task\n", session->local_peer_idx);
1433 run_task_remote_union (session, task);
1434 break;
1435 case ACTION_EVAL_RFN:
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_EVAL_RFN task\n", session->local_peer_idx);
1437 run_task_eval_rfn (session, task);
1438 break;
1439 case ACTION_APPLY_DIFF:
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_APPLY_DIFF task\n", session->local_peer_idx);
1441 run_task_apply_diff (session, task);
1442 break;
1443 case ACTION_FINISH:
1444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_FINISH task\n", session->local_peer_idx);
1445 run_task_finish (session, task);
1446 break;
1447 default:
1448 /* not reached */
1449 GNUNET_assert (0);
1450 }
1451 task->is_running = GNUNET_YES;
1452}
1453
1454
1455static void finish_step (struct Step *step)
1456{
1457 unsigned int i;
1458
1459 GNUNET_assert (step->finished_tasks == step->tasks_len);
1460 GNUNET_assert (GNUNET_YES == step->is_running);
1461 GNUNET_assert (GNUNET_NO == step->is_finished);
1462
857#ifdef GNUNET_EXTRA_LOGGING 1463#ifdef GNUNET_EXTRA_LOGGING
1464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1465 "All tasks of step `%s' with %u subordinates finished.\n",
1466 step->debug_name,
1467 step->subordinates_len);
1468#endif
1469
1470 for (i = 0; i < step->subordinates_len; i++)
858 { 1471 {
859 int in; 1472 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
860 int out; 1473 step->subordinates[i]->pending_prereq--;
861 if (session->partner_outgoing == NULL) 1474#ifdef GNUNET_EXTRA_LOGGING
862 out = -1; 1475 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863 else 1476 "Decreased pending_prereq to %u for step `%s'.\n",
864 out = (int) (session->partner_outgoing - session->info); 1477 step->subordinates[i]->pending_prereq,
865 if (session->partner_incoming == NULL) 1478 step->subordinates[i]->debug_name);
866 in = -1; 1479
867 else 1480#endif
868 in = (int) (session->partner_incoming - session->info); 1481 }
869 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, 1482
870 session->exp_repetition, session->exp_subround, in, out); 1483 step->is_finished = GNUNET_YES;
871 }
872#endif /* GNUNET_EXTRA_LOGGING */
873 1484
1485 // XXX: maybe schedule as task to avoid recursion?
1486 run_ready_steps (step->session);
1487}
1488
1489
1490/*
1491 * Run all steps of the session that don't any
1492 * more dependencies.
1493 */
1494static void
1495run_ready_steps (struct ConsensusSession *session)
1496{
1497 struct Step *step;
1498
1499 step = session->steps_head;
1500
1501 while (NULL != step)
1502 {
1503 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) )
1504 {
1505 size_t i;
1506
1507 GNUNET_assert (0 == step->finished_tasks);
1508
1509#ifdef GNUNET_EXTRA_LOGGING
1510 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d:%d with %d tasks and %d subordinates\n",
1511 session->local_peer_idx,
1512 step->debug_name,
1513 step->start_round, step->num_rounds, step->tasks_len, step->subordinates_len);
1514#endif
1515
1516 step->is_running = GNUNET_YES;
1517 for (i = 0; i < step->tasks_len; i++)
1518 run_task (session, step->tasks[i]);
1519
1520 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
1521 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
1522 finish_step (step);
1523
1524 /* Running the next ready steps will be triggered by task completion */
1525 return;
1526 }
1527 step = step->next;
1528 }
1529
1530 return;
1531}
1532
1533
1534
1535static void
1536finish_task (struct TaskEntry *task)
1537{
1538 GNUNET_assert (GNUNET_NO == task->is_finished);
1539 task->is_finished = GNUNET_YES;
1540
1541 task->step->finished_tasks++;
1542
1543 if (task->step->finished_tasks == task->step->tasks_len)
1544 finish_step (task->step);
874} 1545}
875 1546
876 1547
@@ -886,7 +1557,7 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess
886{ 1557{
887 int i; 1558 int i;
888 for (i = 0; i < session->num_peers; i++) 1559 for (i = 0; i < session->num_peers; i++)
889 if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) 1560 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
890 return i; 1561 return i;
891 return -1; 1562 return -1;
892} 1563}
@@ -899,27 +1570,24 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess
899 * exactly the same peers, the global id will be different. 1570 * exactly the same peers, the global id will be different.
900 * 1571 *
901 * @param session session to generate the global id for 1572 * @param session session to generate the global id for
902 * @param session_id local id of the consensus session 1573 * @param local_session_id local id of the consensus session
903 */ 1574 */
904static void 1575static void
905compute_global_id (struct ConsensusSession *session, 1576compute_global_id (struct ConsensusSession *session,
906 const struct GNUNET_HashCode *session_id) 1577 const struct GNUNET_HashCode *local_session_id)
907{ 1578{
908 int i; 1579 const char *salt = "gnunet-service-consensus/session_id";
909 struct GNUNET_HashCode tmp; 1580
910 struct GNUNET_HashCode phash; 1581 GNUNET_assert (GNUNET_YES ==
911 1582 GNUNET_CRYPTO_kdf (&session->global_id,
912 /* FIXME: use kdf? */ 1583 sizeof (struct GNUNET_HashCode),
913 1584 salt,
914 session->global_id = *session_id; 1585 strlen (salt),
915 for (i = 0; i < session->num_peers; ++i) 1586 session->peers,
916 { 1587 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
917 GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash); 1588 local_session_id,
918 GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp); 1589 sizeof (struct GNUNET_HashCode),
919 session->global_id = tmp; 1590 NULL));
920 GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
921 session->global_id = tmp;
922 }
923} 1591}
924 1592
925 1593
@@ -948,7 +1616,6 @@ initialize_session_peer_list (struct ConsensusSession *session,
948 unsigned int local_peer_in_list; 1616 unsigned int local_peer_in_list;
949 uint32_t listed_peers; 1617 uint32_t listed_peers;
950 const struct GNUNET_PeerIdentity *msg_peers; 1618 const struct GNUNET_PeerIdentity *msg_peers;
951 struct GNUNET_PeerIdentity *peers;
952 unsigned int i; 1619 unsigned int i;
953 1620
954 GNUNET_assert (NULL != join_msg); 1621 GNUNET_assert (NULL != join_msg);
@@ -973,25 +1640,27 @@ initialize_session_peer_list (struct ConsensusSession *session,
973 if (GNUNET_NO == local_peer_in_list) 1640 if (GNUNET_NO == local_peer_in_list)
974 session->num_peers++; 1641 session->num_peers++;
975 1642
976 peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); 1643 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
977 1644
978 if (GNUNET_NO == local_peer_in_list) 1645 if (GNUNET_NO == local_peer_in_list)
979 peers[session->num_peers - 1] = my_peer; 1646 session->peers[session->num_peers - 1] = my_peer;
980 1647
981 memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); 1648 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
982 qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp); 1649 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
1650}
983 1651
984 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
985 1652
986 for (i = 0; i < session->num_peers; ++i) 1653static struct TaskEntry *
987 { 1654lookup_task (struct ConsensusSession *session,
988 /* initialize back-references, so consensus peer information can 1655 struct TaskKey *key)
989 * be used as closure */ 1656{
990 session->info[i].session = session; 1657 struct GNUNET_HashCode hash;
991 session->info[i].peer_id = peers[i]; 1658
992 }
993 1659
994 GNUNET_free (peers); 1660 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
1661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
1662 GNUNET_h2s (&hash));
1663 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
995} 1664}
996 1665
997 1666
@@ -1017,12 +1686,10 @@ set_listen_cb (void *cls,
1017 struct GNUNET_SET_Request *request) 1686 struct GNUNET_SET_Request *request)
1018{ 1687{
1019 struct ConsensusSession *session = cls; 1688 struct ConsensusSession *session = cls;
1020 struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; 1689 struct TaskKey tk;
1021 struct ConsensusPeerInformation *cpi; 1690 struct TaskEntry *task;
1022 struct GNUNET_SET_OperationHandle *set_op; 1691 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
1023 struct RoundInfo round_info; 1692 GNUNET_SET_ResultIterator my_result_cb;
1024 int index;
1025 int cmp;
1026 1693
1027 if (NULL == context_msg) 1694 if (NULL == context_msg)
1028 { 1695 {
@@ -1030,85 +1697,524 @@ set_listen_cb (void *cls,
1030 return; 1697 return;
1031 } 1698 }
1032 1699
1033 index = get_peer_idx (other_peer, session); 1700 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
1701 {
1702 GNUNET_break_op (0);
1703 return;
1704 }
1034 1705
1035 if (index < 0) 1706 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
1036 { 1707 {
1037 GNUNET_break_op (0); 1708 GNUNET_break_op (0);
1038 return; 1709 return;
1039 } 1710 }
1040 1711
1041 round_info.round = ntohl (msg->round); 1712 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
1042 round_info.exp_repetition = ntohl (msg->exp_repetition); 1713
1043 round_info.exp_subround = ntohl (msg->exp_subround); 1714 tk = ((struct TaskKey) {
1715 .kind = ntohs (cm->kind),
1716 .peer1 = ntohs (cm->peer1),
1717 .peer2 = ntohs (cm->peer2),
1718 .repetition = ntohs (cm->repetition),
1719 .leader = ntohs (cm->leader),
1720 });
1044 1721
1045 cpi = &session->info[index]; 1722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
1723 session->local_peer_idx, debug_str_task_key (&tk));
1046 1724
1047 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index); 1725 task = lookup_task (session, &tk);
1048 1726
1049 switch (session->current_round) 1727 if (NULL == task)
1050 { 1728 {
1051 case CONSENSUS_ROUND_BEGIN: 1729 GNUNET_break_op (0);
1052 /* we're in the begin round, so requests for the exchange round may 1730 return;
1053 * come in, they will be delayed for now! */ 1731 }
1054 case CONSENSUS_ROUND_EXCHANGE: 1732
1055 cmp = rounds_compare (session, &round_info); 1733 if (ACTION_RECONCILE != task->action)
1056 if (cmp > 0) 1734 {
1057 { 1735 GNUNET_break_op (0);
1058 /* the other peer is too late */ 1736 return;
1059 LOG_PP (GNUNET_ERROR_TYPE_DEBUG, cpi, "too late for the current round\n"); 1737 }
1060 return; 1738
1061 } 1739 if (GNUNET_YES == task->is_finished)
1062 /* kill old request, if any. this is legal, 1740 {
1063 * as the other peer would not make a new request if it would want to 1741 GNUNET_break_op (0);
1064 * complete the old one! */ 1742 return;
1065 if (NULL != cpi->set_op) 1743 }
1066 { 1744
1067 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got new request from same peer, canceling old one\n"); 1745 if (task->key.peer2 != session->local_peer_idx)
1068 GNUNET_SET_operation_cancel (cpi->set_op); 1746 {
1069 cpi->set_op = NULL; 1747 /* We're being asked, so we must be thne 2nd peer. */
1070 } 1748 GNUNET_break_op (0);
1071 set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, 1749 return;
1072 set_result_cb, &session->info[index]); 1750 }
1073 if (cmp == 0) 1751
1074 { 1752 if (task->key.peer1 == task->key.peer2)
1075 /* we're in exactly the right round for the incoming request */ 1753 my_result_cb = set_result_cb_loop;
1076 if (cpi != cpi->session->partner_incoming) 1754 else
1077 { 1755 my_result_cb = set_result_cb;
1078 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%u: got request from %u (with matching round), " 1756
1079 "but incoming partner is %d\n", cpi->session->local_peer_idx, cpi - cpi->session->info, 1757 task->op = GNUNET_SET_accept (request,
1080 ((NULL == cpi->session->partner_incoming) ? -1 : (cpi->session->partner_incoming - cpi->session->info))); 1758 GNUNET_SET_RESULT_ADDED, /* XXX: obsolete soon */
1081 GNUNET_SET_operation_cancel (set_op); 1759 my_result_cb,
1082 return; 1760 task);
1083 } 1761
1084 cpi->set_op = set_op; 1762 /* If the task hasn't been started yet,
1085 if (GNUNET_OK != GNUNET_SET_commit (set_op, session->element_set)) 1763 we wait for that until we commit. */
1086 { 1764
1087 GNUNET_break (0); 1765 if (GNUNET_YES == task->is_running)
1088 } 1766 {
1089 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index); 1767 commit_set (session, task);
1090 } 1768 }
1091 else 1769}
1092 { 1770
1093 /* we still have wait until we have finished the current round, 1771
1094 * as the other peer's round is larger */ 1772
1095 cpi->delayed_set_op = set_op; 1773static void
1096 cpi->delayed_round_info = round_info; 1774put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
1097 /* The current setop is finished, as we canceled the current setop above. */ 1775 struct TaskEntry *t)
1098 cpi->set_op_finished = GNUNET_YES; 1776{
1099 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index); 1777 struct GNUNET_HashCode round_hash;
1100 } 1778 struct Step *s;
1101 break; 1779
1102 default: 1780 GNUNET_assert (NULL != t->step);
1103 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n", 1781
1104 session->local_peer_idx, session->current_round, index); 1782 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
1105 GNUNET_break_op (0); 1783
1106 return; 1784 s = t->step;
1785
1786 if (s->tasks_len == s->tasks_cap)
1787 {
1788 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
1789 GNUNET_array_grow (s->tasks,
1790 s->tasks_cap,
1791 target_size);
1792 }
1793
1794#ifdef GNUNET_EXTRA_LOGGING
1795 GNUNET_assert (NULL != s->debug_name);
1796 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
1797 debug_str_task_key (&t->key),
1798 s->debug_name);
1799#endif
1800
1801 s->tasks[s->tasks_len] = t;
1802 s->tasks_len++;
1803
1804 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
1805 GNUNET_assert (GNUNET_OK ==
1806 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
1807 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1808}
1809
1810
1811static void
1812install_step_timeouts (struct ConsensusSession *session)
1813{
1814 /* Given the fully constructed task graph
1815 with rounds for tasks, we can give the tasks timeouts. */
1816
1817 /* XXX: implement! */
1818}
1819
1820
1821
1822/*
1823 * Arrange two peers in some canonical order.
1824 */
1825static void
1826arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
1827{
1828 uint16_t a;
1829 uint16_t b;
1830
1831 GNUNET_assert (*p1 < n);
1832 GNUNET_assert (*p2 < n);
1833
1834 if (*p1 < *p2)
1835 {
1836 a = *p1;
1837 b = *p2;
1838 }
1839 else
1840 {
1841 a = *p2;
1842 b = *p1;
1843 }
1844
1845 /* For uniformly random *p1, *p2,
1846 this condition is true with 50% chance */
1847 if (((b - a) + n) % n <= n / 2)
1848 {
1849 *p1 = a;
1850 *p2 = b;
1851 }
1852 else
1853 {
1854 *p1 = b;
1855 *p2 = a;
1107 } 1856 }
1108} 1857}
1109 1858
1110 1859
1111/** 1860/**
1861 * Record @a dep as a dependency of @step.
1862 */
1863static void
1864step_depend_on (struct Step *step, struct Step *dep)
1865{
1866 /* We're not checking for cyclic dependencies,
1867 but this is a cheap sanity check. */
1868 GNUNET_assert (step != dep);
1869 GNUNET_assert (NULL != step);
1870 GNUNET_assert (NULL != dep);
1871 // XXX: make rounds work
1872 //GNUNET_assert (dep->start_round <= step->start_round);
1873
1874#ifdef GNUNET_EXTRA_LOGGING
1875 /* Make sure we have complete debugging information.
1876 Also checks that we don't screw up too badly
1877 constructing the task graph. */
1878 GNUNET_assert (NULL != step->debug_name);
1879 GNUNET_assert (NULL != dep->debug_name);
1880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1881 "Making step `%s' depend on `%s'\n",
1882 step->debug_name,
1883 dep->debug_name);
1884#endif
1885
1886 if (dep->subordinates_cap == dep->subordinates_len)
1887 {
1888 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
1889 GNUNET_array_grow (dep->subordinates,
1890 dep->subordinates_cap,
1891 target_size);
1892 }
1893
1894 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
1895
1896 dep->subordinates[dep->subordinates_len] = step;
1897 dep->subordinates_len++;
1898
1899 step->pending_prereq++;
1900}
1901
1902
1903static struct Step *
1904create_step (struct ConsensusSession *session, int start_round, int num_rounds)
1905{
1906 struct Step *step;
1907 step = GNUNET_new (struct Step);
1908 step->session = session;
1909 step->start_round = start_round;
1910 step->num_rounds = num_rounds;
1911 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
1912 session->steps_tail,
1913 step);
1914 return step;
1915}
1916
1917
1918/**
1919 * Construct the task graph for a single
1920 * gradecast.
1921 */
1922static void
1923construct_task_graph_gradecast (struct ConsensusSession *session,
1924 uint16_t rep,
1925 uint16_t lead,
1926 struct Step *step_before,
1927 struct Step *step_after)
1928{
1929 uint16_t n = session->num_peers;
1930 uint16_t t = n / 3;
1931
1932 uint16_t me = session->local_peer_idx;
1933
1934 uint16_t p1;
1935 uint16_t p2;
1936
1937 /* The task we're currently setting up. */
1938 struct TaskEntry task;
1939
1940 struct Step *step;
1941 struct Step *prev_step;
1942
1943 uint16_t round;
1944
1945 unsigned int k;
1946
1947 round = step_before->start_round + step_before->num_rounds;
1948
1949 /* gcast step 1: leader disseminates */
1950
1951 step = create_step (session, round, 1);
1952
1953#ifdef GNUNET_EXTRA_LOGGING
1954 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
1955#endif
1956 step_depend_on (step, step_before);
1957
1958 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: Considering leader %d\n", session->local_peer_idx, lead);
1959
1960 if (lead == me)
1961 {
1962 for (k = 0; k < n; k++)
1963 {
1964 if (k == me)
1965 continue;
1966 p1 = me;
1967 p2 = k;
1968 arrange_peers (&p1, &p2, n);
1969 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead);
1970 task = ((struct TaskEntry) {
1971 .step = step,
1972 .action = ACTION_RECONCILE,
1973 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
1974 .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
1975 .output_set = (struct SetKey) { SET_KIND_NONE },
1976 });
1977 put_task (session->taskmap, &task);
1978 }
1979 /* We run this task to make sure that the leader
1980 has the stored the SET_KIND_LEADER set of himself,
1981 so he can participate in the rest of the gradecast
1982 without the code having to handle any special cases. */
1983 task = ((struct TaskEntry) {
1984 .step = step,
1985 .action = ACTION_RECONCILE,
1986 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
1987 .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
1988 .output_set = (struct SetKey) { SET_KIND_LEADER, rep, me },
1989 .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, me },
1990 });
1991 put_task (session->taskmap, &task);
1992 }
1993 else
1994 {
1995 p1 = me;
1996 p2 = lead;
1997 arrange_peers (&p1, &p2, n);
1998 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead);
1999 task = ((struct TaskEntry) {
2000 .step = step,
2001 .action = ACTION_RECONCILE,
2002 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
2003 .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
2004 .output_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
2005 .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, lead },
2006 });
2007 put_task (session->taskmap, &task);
2008 }
2009
2010 /* gcast phase 2: echo */
2011 prev_step = step;
2012 step = create_step (session, round, 1);
2013#ifdef GNUNET_EXTRA_LOGGING
2014 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2015#endif
2016 step_depend_on (step, prev_step);
2017
2018 for (k = 0; k < n; k++)
2019 {
2020 p1 = k;
2021 p2 = me;
2022 arrange_peers (&p1, &p2, n);
2023 task = ((struct TaskEntry) {
2024 .step = step,
2025 .action = ACTION_RECONCILE,
2026 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2027 .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
2028 .output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
2029 });
2030 put_task (session->taskmap, &task);
2031 }
2032
2033 prev_step = step;
2034 step = create_step (session, round, 1);
2035#ifdef GNUNET_EXTRA_LOGGING
2036 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2037#endif
2038 step_depend_on (step, prev_step);
2039
2040 arrange_peers (&p1, &p2, n);
2041 task = ((struct TaskEntry) {
2042 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2043 .step = step,
2044 .action = ACTION_EVAL_RFN,
2045 .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
2046 .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
2047 .output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
2048 .threshold = n - t,
2049 });
2050 put_task (session->taskmap, &task);
2051
2052 prev_step = step;
2053 step = create_step (session, round, 1);
2054#ifdef GNUNET_EXTRA_LOGGING
2055 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2056#endif
2057 step_depend_on (step, prev_step);
2058
2059 /* gcast phase 3: confirmation and grading */
2060 for (k = 0; k < n; k++)
2061 {
2062 p1 = k;
2063 p2 = me;
2064 arrange_peers (&p1, &p2, n);
2065 task = ((struct TaskEntry) {
2066 .step = step,
2067 .action = ACTION_RECONCILE,
2068 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2069 .input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
2070 .output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead },
2071 });
2072 put_task (session->taskmap, &task);
2073 }
2074
2075 prev_step = step;
2076 step = create_step (session, round, 1);
2077#ifdef GNUNET_EXTRA_LOGGING
2078 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2079#endif
2080 step_depend_on (step, prev_step);
2081
2082 // evaluate ConfirmationReferendum and
2083 // apply it to the LeaderReferendum
2084 task = ((struct TaskEntry) {
2085 .step = step,
2086 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2087 .action = ACTION_EVAL_RFN,
2088 .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
2089 .output_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, rep },
2090 });
2091 put_task (session->taskmap, &task);
2092
2093 step_depend_on (step_after, step);
2094}
2095
2096
2097static void
2098construct_task_graph (struct ConsensusSession *session)
2099{
2100 uint16_t n = session->num_peers;
2101 uint16_t t = n / 3;
2102
2103 uint16_t me = session->local_peer_idx;
2104
2105 uint16_t p1;
2106 uint16_t p2;
2107
2108 /* The task we're currently setting up. */
2109 struct TaskEntry task;
2110
2111 /* Current leader */
2112 unsigned int lead;
2113
2114 struct Step *step;
2115 struct Step *prev_step;
2116
2117 unsigned int round = 0;
2118
2119 unsigned int i;
2120
2121 // XXX: introduce first step,
2122 // where we wait for all insert acks
2123 // from the set service
2124
2125 /* faster but brittle all-to-all */
2126
2127 // XXX: Not implemented yet
2128
2129 /* all-to-all step */
2130
2131 step = create_step (session, round, 1);
2132
2133#ifdef GNUNET_EXTRA_LOGGING
2134 step->debug_name = GNUNET_strdup ("all to all");
2135#endif
2136
2137 for (i = 0; i < n; i++)
2138 {
2139 p1 = me;
2140 p2 = i;
2141 arrange_peers (&p1, &p2, n);
2142 task = ((struct TaskEntry) {
2143 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2144 .step = step,
2145 .action = ACTION_RECONCILE,
2146 .input_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
2147 .output_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
2148 });
2149 put_task (session->taskmap, &task);
2150 }
2151
2152 round++;
2153
2154 prev_step = step;
2155 step = NULL;
2156
2157 /* Byzantine union */
2158
2159 /* sequential repetitions of the gradecasts */
2160 for (i = 0; i < t + 1; i++)
2161 {
2162 struct Step *step_rep_start;
2163 struct Step *step_rep_end;
2164
2165 step_rep_start = create_step (session, round, 1);
2166#ifdef GNUNET_EXTRA_LOGGING
2167 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2168#endif
2169
2170 step_depend_on (step_rep_start, prev_step);
2171
2172 step_rep_end = create_step (session, round, 1);
2173#ifdef GNUNET_EXTRA_LOGGING
2174 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2175#endif
2176
2177 /* parallel gradecasts */
2178 for (lead = 0; lead < n; lead++)
2179 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2180
2181 // TODO: add peers to ignore list,
2182 //
2183 // evaluate ConfirmationReferendum and
2184 // apply it to the LeaderReferendum
2185 task = ((struct TaskEntry) {
2186 .step = step_rep_end,
2187 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, i, -1},
2188 .action = ACTION_APPLY_DIFF,
2189 .input_set = (struct SetKey) { SET_KIND_CURRENT, i },
2190 .input_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, i },
2191 .output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 },
2192 });
2193 put_task (session->taskmap, &task);
2194
2195 prev_step = step_rep_end;
2196 }
2197
2198 /* There is no next gradecast round, thus the final
2199 start step is the overall end step of the gradecasts */
2200 step = create_step (session, round, 1);
2201#ifdef GNUNET_EXTRA_LOGGING
2202 GNUNET_asprintf (&step->debug_name, "finish");
2203#endif
2204 step_depend_on (step, prev_step);
2205
2206 task = ((struct TaskEntry) {
2207 .step = step,
2208 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2209 .input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 },
2210 .action = ACTION_FINISH,
2211 });
2212
2213 put_task (session->taskmap, &task);
2214}
2215
2216
2217/**
1112 * Initialize the session, continue receiving messages from the owning client 2218 * Initialize the session, continue receiving messages from the owning client
1113 * 2219 *
1114 * @param session the session to initialize 2220 * @param session the session to initialize
@@ -1124,21 +2230,21 @@ initialize_session (struct ConsensusSession *session,
1124 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers); 2230 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
1125 compute_global_id (session, &join_msg->session_id); 2231 compute_global_id (session, &join_msg->session_id);
1126 2232
1127 /* check if some local client already owns the session. 2233 /* Check if some local client already owns the session.
1128 * it is only legal to have a session with an existing global id 2234 It is only legal to have a session with an existing global id
1129 * if all other sessions with this global id are finished.*/ 2235 if all other sessions with this global id are finished.*/
1130 other_session = sessions_head; 2236 other_session = sessions_head;
1131 while (NULL != other_session) 2237 while (NULL != other_session)
1132 { 2238 {
1133 if ((other_session != session) && 2239 if ((other_session != session) &&
1134 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) 2240 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1135 { 2241 {
1136 if (CONSENSUS_ROUND_FINISH != other_session->current_round) 2242 //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1137 { 2243 //{
1138 GNUNET_break (0); 2244 // GNUNET_break (0);
1139 destroy_session (session); 2245 // destroy_session (session);
1140 return; 2246 // return;
1141 } 2247 //}
1142 break; 2248 break;
1143 } 2249 }
1144 other_session = other_session->next; 2250 other_session = other_session->next;
@@ -1152,12 +2258,30 @@ initialize_session (struct ConsensusSession *session,
1152 2258
1153 session->local_peer_idx = get_peer_idx (&my_peer, session); 2259 session->local_peer_idx = get_peer_idx (&my_peer, session);
1154 GNUNET_assert (-1 != session->local_peer_idx); 2260 GNUNET_assert (-1 != session->local_peer_idx);
1155 session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1156 GNUNET_assert (NULL != session->element_set);
1157 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, 2261 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1158 &session->global_id, 2262 &session->global_id,
1159 set_listen_cb, session); 2263 set_listen_cb, session);
1160 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx); 2264 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2265
2266 session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2267 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2268 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2269 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2270
2271 {
2272 struct SetEntry *client_set;
2273 client_set = GNUNET_new (struct SetEntry);
2274 client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2275 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2276 put_set (session, client_set);
2277 }
2278
2279 session->peers_ignored = GNUNET_new_array (session->num_peers, int);
2280
2281 /* Just construct the task graph,
2282 but don't run anything until the client calls conclude. */
2283 construct_task_graph (session);
2284
1161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id)); 2285 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1162} 2286}
1163 2287
@@ -1212,6 +2336,13 @@ client_join (void *cls,
1212} 2336}
1213 2337
1214 2338
2339static void
2340client_insert_done (void *cls)
2341{
2342 // FIXME: implement
2343}
2344
2345
1215/** 2346/**
1216 * Called when a client performs an insert operation. 2347 * Called when a client performs an insert operation.
1217 * 2348 *
@@ -1228,6 +2359,7 @@ client_insert (void *cls,
1228 struct GNUNET_CONSENSUS_ElementMessage *msg; 2359 struct GNUNET_CONSENSUS_ElementMessage *msg;
1229 struct GNUNET_SET_Element *element; 2360 struct GNUNET_SET_Element *element;
1230 ssize_t element_size; 2361 ssize_t element_size;
2362 struct GNUNET_SET_Handle *initial_set;
1231 2363
1232 session = get_session_by_client (client); 2364 session = get_session_by_client (client);
1233 2365
@@ -1238,7 +2370,7 @@ client_insert (void *cls,
1238 return; 2370 return;
1239 } 2371 }
1240 2372
1241 if (CONSENSUS_ROUND_BEGIN != session->current_round) 2373 if (GNUNET_YES == session->conclude_started)
1242 { 2374 {
1243 GNUNET_break (0); 2375 GNUNET_break (0);
1244 GNUNET_SERVER_client_disconnect (client); 2376 GNUNET_SERVER_client_disconnect (client);
@@ -1258,11 +2390,19 @@ client_insert (void *cls,
1258 element->size = element_size; 2390 element->size = element_size;
1259 memcpy (&element[1], &msg[1], element_size); 2391 memcpy (&element[1], &msg[1], element_size);
1260 element->data = &element[1]; 2392 element->data = &element[1];
1261 GNUNET_SET_add_element (session->element_set, element, NULL, NULL); 2393 {
2394 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
2395 struct SetEntry *entry;
2396 entry = lookup_set (session, &key);
2397 GNUNET_assert (NULL != entry);
2398 initial_set = entry->h;
2399 }
2400 session->num_client_insert_pending++;
2401 GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
1262 GNUNET_free (element); 2402 GNUNET_free (element);
1263 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2403 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1264 2404
1265 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx); 2405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1266} 2406}
1267 2407
1268 2408
@@ -1280,7 +2420,6 @@ client_conclude (void *cls,
1280{ 2420{
1281 struct ConsensusSession *session; 2421 struct ConsensusSession *session;
1282 2422
1283 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1284 session = get_session_by_client (client); 2423 session = get_session_by_client (client);
1285 if (NULL == session) 2424 if (NULL == session)
1286 { 2425 {
@@ -1289,24 +2428,24 @@ client_conclude (void *cls,
1289 GNUNET_SERVER_client_disconnect (client); 2428 GNUNET_SERVER_client_disconnect (client);
1290 return; 2429 return;
1291 } 2430 }
1292 if (CONSENSUS_ROUND_BEGIN != session->current_round) 2431
2432 if (GNUNET_YES == session->conclude_started)
1293 { 2433 {
1294 /* client requested conclude twice */ 2434 /* conclude started twice */
1295 GNUNET_break (0); 2435 GNUNET_break (0);
2436 GNUNET_SERVER_client_disconnect (client);
2437 destroy_session (session);
1296 return; 2438 return;
1297 } 2439 }
1298 if (session->num_peers <= 1)
1299 {
1300 session->current_round = CONSENSUS_ROUND_FINISH;
1301 GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
1302 }
1303 else
1304 {
1305 /* the 'begin' round is over, start with the next, actual round */
1306 round_over (session, NULL);
1307 }
1308 2440
1309 GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round); 2441 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
2442
2443 session->conclude_started = GNUNET_YES;
2444
2445 install_step_timeouts (session);
2446 run_ready_steps (session);
2447
2448
1310 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2449 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1311} 2450}
1312 2451
@@ -1343,17 +2482,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1343 session = get_session_by_client (client); 2482 session = get_session_by_client (client);
1344 if (NULL == session) 2483 if (NULL == session)
1345 return; 2484 return;
1346 if ((CONSENSUS_ROUND_BEGIN == session->current_round) || 2485 // FIXME: destroy if we can
1347 (CONSENSUS_ROUND_FINISH == session->current_round))
1348 {
1349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, destroying session\n");
1350 destroy_session (session);
1351 return;
1352 }
1353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1354} 2486}
1355 2487
1356 2488
2489
1357/** 2490/**
1358 * Start processing consensus requests. 2491 * Start processing consensus requests.
1359 * 2492 *