diff options
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 2465 |
1 files changed, 1799 insertions, 666 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index acae74eaf..424be7a71 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -34,234 +34,410 @@ | |||
34 | #include "consensus.h" | 34 | #include "consensus.h" |
35 | 35 | ||
36 | 36 | ||
37 | /** | ||
38 | * Log macro that prefixes the local peer and the peer we are in contact with. | ||
39 | * | ||
40 | * @param kind log level | ||
41 | * @param cpi ConsensusPeerInformation of the partner peer | ||
42 | * @param m log message | ||
43 | */ | ||
44 | #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \ | ||
45 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__) | ||
46 | 37 | ||
38 | GNUNET_NETWORK_STRUCT_BEGIN | ||
47 | 39 | ||
48 | /** | 40 | /** |
49 | * Number of exponential rounds, used in the exp and completion round. | 41 | * Tuple of integers that together |
42 | * identify a task uniquely. | ||
50 | */ | 43 | */ |
51 | #define NUM_EXP_REPETITIONS 4 | 44 | struct TaskKey { |
52 | 45 | /** | |
53 | 46 | * A value from 'enum PhaseKind'. | |
54 | /* forward declarations */ | 47 | */ |
55 | 48 | uint16_t kind GNUNET_PACKED; | |
56 | /* mutual recursion with struct ConsensusSession */ | ||
57 | struct ConsensusPeerInformation; | ||
58 | |||
59 | /* mutual recursion with round_over */ | ||
60 | static void | ||
61 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
62 | 49 | ||
50 | /** | ||
51 | * Number of the first peer | ||
52 | * in canonical order. | ||
53 | */ | ||
54 | int16_t peer1 GNUNET_PACKED; | ||
63 | 55 | ||
64 | /** | ||
65 | * Describes the current round a consensus session is in. | ||
66 | */ | ||
67 | enum ConsensusRound | ||
68 | { | ||
69 | /** | 56 | /** |
70 | * Not started the protocol yet. | 57 | * Number of the second peer in canonical order. |
71 | */ | 58 | */ |
72 | CONSENSUS_ROUND_BEGIN=0, | 59 | int16_t peer2 GNUNET_PACKED; |
60 | |||
73 | /** | 61 | /** |
74 | * Distribution of elements with the exponential scheme. | 62 | * Repetition of the gradecast phase. |
75 | */ | 63 | */ |
76 | CONSENSUS_ROUND_EXCHANGE, | 64 | int16_t repetition GNUNET_PACKED; |
65 | |||
77 | /** | 66 | /** |
78 | * Collect and distribute missing values. | 67 | * Leader in the gradecast phase. |
68 | * | ||
69 | * Can be different from both peer1 and peer2. | ||
79 | */ | 70 | */ |
80 | CONSENSUS_ROUND_COMPLETION, | 71 | int16_t leader GNUNET_PACKED; |
72 | }; | ||
73 | |||
74 | |||
75 | enum ReferendumVote | ||
76 | { | ||
77 | VOTE_NONE = 0, | ||
78 | VOTE_ADD = 1, | ||
79 | VOTE_REMOVE = 2, | ||
80 | VOTE_CONTESTED = 3 | ||
81 | }; | ||
82 | |||
83 | |||
84 | struct SetKey | ||
85 | { | ||
86 | int set_kind GNUNET_PACKED; | ||
87 | int k1 GNUNET_PACKED; | ||
88 | int k2 GNUNET_PACKED; | ||
89 | }; | ||
90 | |||
91 | |||
92 | struct SetEntry | ||
93 | { | ||
94 | struct SetKey key; | ||
95 | struct GNUNET_SET_Handle *h; | ||
81 | /** | 96 | /** |
82 | * Consensus concluded. After timeout and finished communication with client, | 97 | * GNUNET_YES if the set resulted |
83 | * consensus session will be destroyed. | 98 | * from applying a referendum with contested |
99 | * elements. | ||
84 | */ | 100 | */ |
85 | CONSENSUS_ROUND_FINISH | 101 | int is_contested; |
86 | }; | 102 | }; |
87 | 103 | ||
88 | 104 | ||
89 | /** | 105 | struct DiffKey |
90 | * Information about the current round. | 106 | { |
91 | */ | 107 | int diff_kind GNUNET_PACKED; |
92 | struct RoundInfo | 108 | int k1 GNUNET_PACKED; |
109 | int k2 GNUNET_PACKED; | ||
110 | }; | ||
111 | |||
112 | struct RfnKey | ||
113 | { | ||
114 | int rfn_kind GNUNET_PACKED; | ||
115 | int k1 GNUNET_PACKED; | ||
116 | int k2 GNUNET_PACKED; | ||
117 | }; | ||
118 | |||
119 | |||
120 | GNUNET_NETWORK_STRUCT_END | ||
121 | |||
122 | enum PhaseKind | ||
123 | { | ||
124 | PHASE_KIND_ALL_TO_ALL, | ||
125 | PHASE_KIND_GRADECAST_LEADER, | ||
126 | PHASE_KIND_GRADECAST_ECHO, | ||
127 | PHASE_KIND_GRADECAST_ECHO_GRADE, | ||
128 | PHASE_KIND_GRADECAST_CONFIRM, | ||
129 | PHASE_KIND_GRADECAST_CONFIRM_GRADE, | ||
130 | PHASE_KIND_GRADECAST_APPLY_RESULT, | ||
131 | PHASE_KIND_FINISH, | ||
132 | }; | ||
133 | |||
134 | |||
135 | enum ActionType | ||
93 | { | 136 | { |
94 | /** | 137 | /** |
95 | * The current main round. | 138 | * Do a set reconciliation with another peer (or via looback). |
96 | */ | 139 | */ |
97 | enum ConsensusRound round; | 140 | ACTION_RECONCILE, |
98 | /** | 141 | /** |
99 | * The current exp round repetition, valid if | 142 | * Apply a referendum with a threshold |
100 | * the main round is an exp round. | 143 | * to a set and/or a diff. |
101 | */ | 144 | */ |
102 | uint32_t exp_repetition; | 145 | ACTION_EVAL_RFN, |
103 | /** | 146 | /** |
104 | * The current exp subround, valid if | 147 | * Apply a diff to a set. |
105 | * the main round is an exp round. | ||
106 | */ | 148 | */ |
107 | uint32_t exp_subround; | 149 | ACTION_APPLY_DIFF, |
150 | ACTION_FINISH, | ||
151 | }; | ||
152 | |||
153 | enum SetKind | ||
154 | { | ||
155 | SET_KIND_NONE = 0, | ||
156 | SET_KIND_CURRENT, | ||
157 | SET_KIND_LEADER, | ||
158 | SET_KIND_ECHO_RESULT, | ||
108 | }; | 159 | }; |
109 | 160 | ||
161 | enum DiffKind | ||
162 | { | ||
163 | DIFF_KIND_NONE = 0, | ||
164 | DIFF_KIND_LEADER, | ||
165 | DIFF_KIND_GRADECAST_RESULT, | ||
166 | }; | ||
110 | 167 | ||
111 | /** | 168 | enum RfnKind |
112 | * A consensus session consists of one local client and the remote authorities. | 169 | { |
170 | RFN_KIND_NONE = 0, | ||
171 | RFN_KIND_ECHO, | ||
172 | RFN_KIND_CONFIRM, | ||
173 | }; | ||
174 | |||
175 | |||
176 | /* | ||
177 | * Node in the consensus task graph. | ||
113 | */ | 178 | */ |
114 | struct ConsensusSession | 179 | struct TaskEntry |
115 | { | 180 | { |
181 | struct TaskKey key; | ||
182 | |||
183 | struct Step *step; | ||
184 | |||
185 | int is_running; | ||
186 | |||
187 | int is_finished; | ||
188 | |||
189 | enum ActionType action; | ||
190 | |||
191 | struct SetKey input_set; | ||
192 | struct DiffKey input_diff; | ||
193 | struct RfnKey input_rfn; | ||
194 | struct SetKey output_set; | ||
195 | struct DiffKey output_diff; | ||
196 | struct RfnKey output_rfn; | ||
197 | |||
116 | /** | 198 | /** |
117 | * Consensus sessions are kept in a DLL. | 199 | * Threshold when evaluating referendums. |
118 | */ | 200 | */ |
119 | struct ConsensusSession *next; | 201 | uint16_t threshold; |
120 | 202 | ||
121 | /** | 203 | /** |
122 | * Consensus sessions are kept in a DLL. | 204 | * Operation that is running for this task. |
123 | */ | 205 | */ |
124 | struct ConsensusSession *prev; | 206 | struct GNUNET_SET_OperationHandle *op; |
125 | 207 | ||
126 | /** | 208 | struct GNUNET_SET_Handle *commited_set; |
127 | * Global consensus identification, computed | 209 | }; |
128 | * from the session id and participating authorities. | ||
129 | */ | ||
130 | struct GNUNET_HashCode global_id; | ||
131 | 210 | ||
211 | |||
212 | struct Step | ||
213 | { | ||
132 | /** | 214 | /** |
133 | * Client that inhabits the session | 215 | * All steps of one session are in a |
216 | * linked list for easier deallocation. | ||
134 | */ | 217 | */ |
135 | struct GNUNET_SERVER_Client *client; | 218 | struct Step *prev; |
136 | 219 | ||
137 | /** | 220 | /** |
138 | * Queued messages to the client. | 221 | * All steps of one session are in a |
222 | * linked list for easier deallocation. | ||
139 | */ | 223 | */ |
140 | struct GNUNET_MQ_Handle *client_mq; | 224 | struct Step *next; |
141 | 225 | ||
142 | /** | 226 | struct ConsensusSession *session; |
143 | * Time when the conclusion of the consensus should begin. | 227 | |
228 | struct TaskEntry **tasks; | ||
229 | unsigned int tasks_len; | ||
230 | unsigned int tasks_cap; | ||
231 | |||
232 | unsigned int finished_tasks; | ||
233 | |||
234 | /* | ||
235 | * Tasks that have this task as dependency. | ||
236 | * | ||
237 | * We store pointers to subordinates rather | ||
238 | * than to prerequisites since it makes | ||
239 | * tracking the readiness of a task easier. | ||
144 | */ | 240 | */ |
145 | struct GNUNET_TIME_Absolute conclude_start; | 241 | struct Step **subordinates; |
242 | unsigned int subordinates_len; | ||
243 | unsigned int subordinates_cap; | ||
146 | 244 | ||
147 | /** | 245 | /** |
148 | * Timeout for all rounds together, single rounds will schedule a timeout task | 246 | * Counter for the prerequisites of |
149 | * with a fraction of the conclude timeout. | 247 | * this step. |
150 | * Only valid once the current round is not CONSENSUS_ROUND_BEGIN. | ||
151 | */ | 248 | */ |
152 | struct GNUNET_TIME_Absolute conclude_deadline; | 249 | size_t pending_prereq; |
153 | 250 | ||
154 | /** | 251 | /* |
155 | * Timeout task identifier for the current round or subround. | 252 | * Task that will run this step despite |
253 | * any pending prerequisites. | ||
156 | */ | 254 | */ |
157 | struct GNUNET_SCHEDULER_Task * round_timeout_tid; | 255 | struct GNUNET_SCHEDULER_Task *timeout_task; |
158 | 256 | ||
159 | /** | 257 | unsigned int is_running; |
160 | * Number of other peers in the consensus. | 258 | |
259 | unsigned int is_finished; | ||
260 | |||
261 | /* | ||
262 | * Round that this step should start. | ||
263 | * If not all prerequisites have run, | ||
264 | * the task will run anyway. | ||
161 | */ | 265 | */ |
162 | unsigned int num_peers; | 266 | unsigned int start_round; |
163 | 267 | ||
164 | /** | 268 | /* |
165 | * Information about the other peers, | 269 | * Number of rounds this step occupies. |
166 | * their state, etc. | 270 | * |
271 | * Some steps are more expensive, and thus | ||
272 | * are allocated more rounds. | ||
167 | */ | 273 | */ |
168 | struct ConsensusPeerInformation *info; | 274 | unsigned int num_rounds; |
169 | 275 | ||
170 | /** | 276 | /** |
171 | * Index of the local peer in the peers array | 277 | * Human-readable name for |
278 | * the task, used for debugging. | ||
172 | */ | 279 | */ |
173 | unsigned int local_peer_idx; | 280 | char *debug_name; |
281 | }; | ||
174 | 282 | ||
175 | /** | 283 | struct RfnPeerInfo |
176 | * Current round | 284 | { |
285 | /* Peers can propose changes, | ||
286 | * but they are only accepted once | ||
287 | * the whole set operation is done. */ | ||
288 | int is_commited; | ||
289 | }; | ||
290 | |||
291 | struct RfnElementInfo | ||
292 | { | ||
293 | struct GNUNET_SET_Element *element; | ||
294 | |||
295 | /* | ||
296 | * Vote (or VOTE_NONE) from every peer | ||
297 | * in the session about the element. | ||
177 | */ | 298 | */ |
178 | enum ConsensusRound current_round; | 299 | int *votes; |
300 | }; | ||
179 | 301 | ||
180 | /** | 302 | |
181 | * Permutation of peers for the current round, | 303 | struct ReferendumEntry |
304 | { | ||
305 | struct RfnKey key; | ||
306 | |||
307 | /* | ||
308 | * Elements where there is at least one proposed change. | ||
309 | * | ||
310 | * Maps the hash of the GNUNET_SET_Element | ||
311 | * to 'struct RfnElementInfo'. | ||
182 | */ | 312 | */ |
183 | uint32_t *shuffle; | 313 | struct GNUNET_CONTAINER_MultiHashMap *rfn_elements; |
184 | 314 | ||
185 | /** | 315 | /** |
186 | * Inverse permutation of peers for the current round, | 316 | * Stores, for every peer in the session, |
317 | * whether the peer finished the whole referendum. | ||
318 | * | ||
319 | * Votes from peers are only counted if they're | ||
320 | * marked as commited (#GNUNET_YES) in the referendum. | ||
321 | * | ||
322 | * Otherwise (#GNUNET_NO), the requested changes are | ||
323 | * not counted for majority votes or thresholds. | ||
187 | */ | 324 | */ |
188 | uint32_t *shuffle_inv; | 325 | int *peer_commited; |
326 | }; | ||
327 | |||
328 | |||
329 | struct DiffElementInfo | ||
330 | { | ||
331 | struct GNUNET_SET_Element *element; | ||
189 | 332 | ||
190 | /** | 333 | /** |
191 | * Current round of the exponential scheme. | 334 | * Positive weight for 'add', negative |
335 | * weights for 'remove'. | ||
192 | */ | 336 | */ |
193 | uint32_t exp_repetition; | 337 | int weight; |
338 | }; | ||
339 | |||
340 | |||
341 | /** | ||
342 | * Weighted diff. | ||
343 | */ | ||
344 | struct DiffEntry | ||
345 | { | ||
346 | struct DiffKey key; | ||
347 | struct GNUNET_CONTAINER_MultiHashMap *changes; | ||
348 | }; | ||
349 | |||
194 | 350 | ||
351 | |||
352 | /** | ||
353 | * A consensus session consists of one local client and the remote authorities. | ||
354 | */ | ||
355 | struct ConsensusSession | ||
356 | { | ||
195 | /** | 357 | /** |
196 | * Current sub-round of the exponential scheme. | 358 | * Consensus sessions are kept in a DLL. |
197 | */ | 359 | */ |
198 | uint32_t exp_subround; | 360 | struct ConsensusSession *next; |
199 | 361 | ||
200 | /** | 362 | /** |
201 | * The partner for the current exp-round. | 363 | * Consensus sessions are kept in a DLL. |
202 | * The local peer will initiate the set reconciliation with the | ||
203 | * outgoing peer. | ||
204 | */ | 364 | */ |
205 | struct ConsensusPeerInformation *partner_outgoing; | 365 | struct ConsensusSession *prev; |
366 | |||
367 | unsigned int num_client_insert_pending; | ||
368 | |||
369 | struct GNUNET_CONTAINER_MultiHashMap *setmap; | ||
370 | struct GNUNET_CONTAINER_MultiHashMap *rfnmap; | ||
371 | struct GNUNET_CONTAINER_MultiHashMap *diffmap; | ||
206 | 372 | ||
207 | /** | 373 | /** |
208 | * The partner for the current exp-round | 374 | * Array of peers with length 'num_peers'. |
209 | * The incoming peer will initiate the set reconciliation with | 375 | */ |
210 | * the incoming peer. | 376 | int *peers_ignored; |
377 | |||
378 | /* | ||
379 | * Mapping from (hashed) TaskKey to TaskEntry. | ||
380 | * | ||
381 | * We map the application_id for a round to the task that should be | ||
382 | * executed, so we don't have to go through all task whenever we get | ||
383 | * an incoming set op request. | ||
211 | */ | 384 | */ |
212 | struct ConsensusPeerInformation *partner_incoming; | 385 | struct GNUNET_CONTAINER_MultiHashMap *taskmap; |
386 | |||
387 | struct Step *steps_head; | ||
388 | struct Step *steps_tail; | ||
389 | |||
390 | int conclude_started; | ||
391 | |||
392 | int conclude_done; | ||
213 | 393 | ||
214 | /** | 394 | /** |
215 | * The consensus set of this session. | 395 | * Global consensus identification, computed |
216 | */ | 396 | * from the session id and participating authorities. |
217 | struct GNUNET_SET_Handle *element_set; | 397 | */ |
398 | struct GNUNET_HashCode global_id; | ||
218 | 399 | ||
219 | /** | 400 | /** |
220 | * Listener for requests from other peers. | 401 | * Client that inhabits the session |
221 | * Uses the session's global id as app id. | ||
222 | */ | 402 | */ |
223 | struct GNUNET_SET_ListenHandle *set_listener; | 403 | struct GNUNET_SERVER_Client *client; |
224 | }; | ||
225 | |||
226 | 404 | ||
227 | /** | ||
228 | * Information about a peer that is in a consensus session. | ||
229 | */ | ||
230 | struct ConsensusPeerInformation | ||
231 | { | ||
232 | /** | 405 | /** |
233 | * Peer identitty of the peer in the consensus session | 406 | * Queued messages to the client. |
234 | */ | 407 | */ |
235 | struct GNUNET_PeerIdentity peer_id; | 408 | struct GNUNET_MQ_Handle *client_mq; |
236 | 409 | ||
237 | /** | 410 | /** |
238 | * Back-reference to the consensus session, | 411 | * Time when the conclusion of the consensus should begin. |
239 | * to that ConsensusPeerInformation can be used as a closure | ||
240 | */ | 412 | */ |
241 | struct ConsensusSession *session; | 413 | struct GNUNET_TIME_Absolute conclude_start; |
242 | 414 | ||
243 | /** | 415 | /** |
244 | * Have we finished the set operation for this (sub-)round? | 416 | * Timeout for all rounds together, single rounds will schedule a timeout task |
417 | * with a fraction of the conclude timeout. | ||
418 | * Only valid once the current round is not CONSENSUS_ROUND_BEGIN. | ||
245 | */ | 419 | */ |
246 | int set_op_finished; | 420 | struct GNUNET_TIME_Absolute conclude_deadline; |
421 | |||
422 | struct GNUNET_PeerIdentity *peers; | ||
247 | 423 | ||
248 | /** | 424 | /** |
249 | * Set operation we are currently executing with this peer. | 425 | * Number of other peers in the consensus. |
250 | */ | 426 | */ |
251 | struct GNUNET_SET_OperationHandle *set_op; | 427 | unsigned int num_peers; |
252 | 428 | ||
253 | /** | 429 | /** |
254 | * Set operation we are planning on executing with this peer. | 430 | * Index of the local peer in the peers array |
255 | */ | 431 | */ |
256 | struct GNUNET_SET_OperationHandle *delayed_set_op; | 432 | unsigned int local_peer_idx; |
257 | 433 | ||
258 | /** | 434 | /** |
259 | * Info about the round of the delayed set operation. | 435 | * Listener for requests from other peers. |
436 | * Uses the session's global id as app id. | ||
260 | */ | 437 | */ |
261 | struct RoundInfo delayed_round_info; | 438 | struct GNUNET_SET_ListenHandle *set_listener; |
262 | }; | 439 | }; |
263 | 440 | ||
264 | |||
265 | /** | 441 | /** |
266 | * Linked list of sessions this peer participates in. | 442 | * Linked list of sessions this peer participates in. |
267 | */ | 443 | */ |
@@ -288,31 +464,123 @@ static struct GNUNET_SERVER_Handle *srv; | |||
288 | static struct GNUNET_PeerIdentity my_peer; | 464 | static struct GNUNET_PeerIdentity my_peer; |
289 | 465 | ||
290 | 466 | ||
291 | /** | 467 | static void |
292 | * Check if the current subround has finished. | 468 | finish_task (struct TaskEntry *task); |
293 | * Must only be called when an exp-round is the current round. | 469 | |
294 | * | 470 | static void |
295 | * @param session session to check for exp-round completion | 471 | run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task); |
296 | * @return GNUNET_YES if the subround has finished, | 472 | |
297 | * GNUNET_NO if not | 473 | static void |
298 | */ | 474 | run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task); |
299 | static int | 475 | |
300 | have_exp_subround_finished (const struct ConsensusSession *session) | 476 | static void |
301 | { | 477 | run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task); |
302 | int not_finished; | 478 | |
303 | 479 | static void | |
304 | GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round); | 480 | run_ready_steps (struct ConsensusSession *session); |
305 | 481 | ||
306 | not_finished = 0; | 482 | static const char * |
307 | if ( (NULL != session->partner_outgoing) && | 483 | phasename (uint16_t phase) |
308 | (GNUNET_NO == session->partner_outgoing->set_op_finished) ) | 484 | { |
309 | not_finished++; | 485 | switch (phase) |
310 | if ( (NULL != session->partner_incoming) && | 486 | { |
311 | (GNUNET_NO == session->partner_incoming->set_op_finished) ) | 487 | case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL"; |
312 | not_finished++; | 488 | case PHASE_KIND_FINISH: return "FINISH"; |
313 | if (0 == not_finished) | 489 | case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER"; |
314 | return GNUNET_YES; | 490 | case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO"; |
315 | return GNUNET_NO; | 491 | case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE"; |
492 | case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM"; | ||
493 | case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE"; | ||
494 | case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT"; | ||
495 | default: return "(unknown)"; | ||
496 | } | ||
497 | } | ||
498 | |||
499 | |||
500 | static const char * | ||
501 | setname (uint16_t kind) | ||
502 | { | ||
503 | switch (kind) | ||
504 | { | ||
505 | case SET_KIND_CURRENT: return "CURRENT"; | ||
506 | case SET_KIND_LEADER: return "LEADER"; | ||
507 | case SET_KIND_NONE: return "NONE"; | ||
508 | default: return "(unknown)"; | ||
509 | } | ||
510 | } | ||
511 | |||
512 | static const char * | ||
513 | rfnname (uint16_t kind) | ||
514 | { | ||
515 | switch (kind) | ||
516 | { | ||
517 | case RFN_KIND_NONE: return "NONE"; | ||
518 | case RFN_KIND_ECHO: return "ECHO"; | ||
519 | case RFN_KIND_CONFIRM: return "CONFIRM"; | ||
520 | default: return "(unknown)"; | ||
521 | } | ||
522 | } | ||
523 | |||
524 | static const char * | ||
525 | diffname (uint16_t kind) | ||
526 | { | ||
527 | switch (kind) | ||
528 | { | ||
529 | case DIFF_KIND_NONE: return "NONE"; | ||
530 | case DIFF_KIND_LEADER: return "LEADER"; | ||
531 | case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT"; | ||
532 | default: return "(unknown)"; | ||
533 | } | ||
534 | } | ||
535 | |||
536 | static const char * | ||
537 | debug_str_task_key (struct TaskKey *tk) | ||
538 | { | ||
539 | static char buf[256]; | ||
540 | |||
541 | snprintf (buf, sizeof (buf), | ||
542 | "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d", | ||
543 | phasename (tk->kind), tk->peer1, tk->peer2, | ||
544 | tk->leader, tk->repetition); | ||
545 | |||
546 | return buf; | ||
547 | } | ||
548 | |||
549 | static const char * | ||
550 | debug_str_diff_key (struct DiffKey *dk) | ||
551 | { | ||
552 | static char buf[256]; | ||
553 | |||
554 | snprintf (buf, sizeof (buf), | ||
555 | "DiffKey kind=%s, k1=%d, k2=%d", | ||
556 | diffname (dk->diff_kind), dk->k1, dk->k2); | ||
557 | |||
558 | return buf; | ||
559 | } | ||
560 | |||
561 | static const char * | ||
562 | debug_str_set_key (struct SetKey *sk) | ||
563 | { | ||
564 | static char buf[256]; | ||
565 | |||
566 | snprintf (buf, sizeof (buf), | ||
567 | "SetKey kind=%s, k1=%d, k2=%d", | ||
568 | setname (sk->set_kind), sk->k1, sk->k2); | ||
569 | |||
570 | return buf; | ||
571 | } | ||
572 | |||
573 | |||
574 | static const char * | ||
575 | debug_str_rfn_key (struct RfnKey *rk) | ||
576 | { | ||
577 | static char buf[256]; | ||
578 | |||
579 | snprintf (buf, sizeof (buf), | ||
580 | "RfnKey kind=%s, k1=%d, k2=%d", | ||
581 | rfnname (rk->rfn_kind), rk->k1, rk->k2); | ||
582 | |||
583 | return buf; | ||
316 | } | 584 | } |
317 | 585 | ||
318 | 586 | ||
@@ -325,11 +593,6 @@ static void | |||
325 | destroy_session (struct ConsensusSession *session) | 593 | destroy_session (struct ConsensusSession *session) |
326 | { | 594 | { |
327 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); | 595 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); |
328 | if (NULL != session->element_set) | ||
329 | { | ||
330 | GNUNET_SET_destroy (session->element_set); | ||
331 | session->element_set = NULL; | ||
332 | } | ||
333 | if (NULL != session->set_listener) | 596 | if (NULL != session->set_listener) |
334 | { | 597 | { |
335 | GNUNET_SET_listen_cancel (session->set_listener); | 598 | GNUNET_SET_listen_cancel (session->set_listener); |
@@ -345,38 +608,13 @@ destroy_session (struct ConsensusSession *session) | |||
345 | GNUNET_SERVER_client_disconnect (session->client); | 608 | GNUNET_SERVER_client_disconnect (session->client); |
346 | session->client = NULL; | 609 | session->client = NULL; |
347 | } | 610 | } |
348 | if (NULL != session->shuffle) | ||
349 | { | ||
350 | GNUNET_free (session->shuffle); | ||
351 | session->shuffle = NULL; | ||
352 | } | ||
353 | if (NULL != session->shuffle_inv) | ||
354 | { | ||
355 | GNUNET_free (session->shuffle_inv); | ||
356 | session->shuffle_inv = NULL; | ||
357 | } | ||
358 | if (NULL != session->info) | ||
359 | { | ||
360 | int i; | ||
361 | for (i = 0; i < session->num_peers; i++) | ||
362 | { | ||
363 | struct ConsensusPeerInformation *cpi; | ||
364 | cpi = &session->info[i]; | ||
365 | if (NULL != cpi->set_op) | ||
366 | { | ||
367 | GNUNET_SET_operation_cancel (cpi->set_op); | ||
368 | cpi->set_op = NULL; | ||
369 | } | ||
370 | } | ||
371 | GNUNET_free (session->info); | ||
372 | session->info = NULL; | ||
373 | } | ||
374 | GNUNET_free (session); | 611 | GNUNET_free (session); |
375 | } | 612 | } |
376 | 613 | ||
377 | 614 | ||
378 | /** | 615 | /** |
379 | * Iterator for set elements. [FIXME: bad comment] | 616 | * Send the final result set of the consensus to the client, element by |
617 | * element. | ||
380 | * | 618 | * |
381 | * @param cls closure | 619 | * @param cls closure |
382 | * @param element the current element, NULL if all elements have been | 620 | * @param element the current element, NULL if all elements have been |
@@ -387,7 +625,8 @@ static int | |||
387 | send_to_client_iter (void *cls, | 625 | send_to_client_iter (void *cls, |
388 | const struct GNUNET_SET_Element *element) | 626 | const struct GNUNET_SET_Element *element) |
389 | { | 627 | { |
390 | struct ConsensusSession *session = cls; | 628 | struct TaskEntry *task = (struct TaskEntry *) cls; |
629 | struct ConsensusSession *session = task->step->session; | ||
391 | struct GNUNET_MQ_Envelope *ev; | 630 | struct GNUNET_MQ_Envelope *ev; |
392 | 631 | ||
393 | if (NULL != element) | 632 | if (NULL != element) |
@@ -417,185 +656,109 @@ send_to_client_iter (void *cls, | |||
417 | 656 | ||
418 | 657 | ||
419 | /** | 658 | /** |
420 | * Start the next round. | 659 | * Callback for set operation results. Called for each element |
421 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | 660 | * in the result set. |
422 | * | 661 | * |
423 | * @param cls the session | 662 | * @param cls closure |
424 | * @param tc task context, for when this task is invoked by the scheduler, | 663 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK |
425 | * NULL if invoked for another reason | 664 | * @param status see enum GNUNET_SET_Status |
426 | */ | 665 | */ |
427 | static void | 666 | static void |
428 | round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 667 | set_result_cb_loop (void *cls, |
668 | const struct GNUNET_SET_Element *element, | ||
669 | enum GNUNET_SET_Status status) | ||
429 | { | 670 | { |
430 | struct ConsensusSession *session; | 671 | /* Nothing to do here. |
431 | unsigned int i; | 672 | This is the callback for looped local set operations, everything is |
432 | int res; | 673 | handled by the first callback */ |
674 | |||
675 | struct TaskEntry *task = cls; | ||
676 | struct ConsensusSession *session = task->step->session; | ||
677 | |||
678 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
679 | "P%u: skipping looped set result for {%s}, status %u\n", | ||
680 | session->local_peer_idx, | ||
681 | debug_str_task_key (&task->key), | ||
682 | status); | ||
683 | } | ||
433 | 684 | ||
434 | /* don't kick off next round if we're shutting down */ | ||
435 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
436 | return; | ||
437 | 685 | ||
438 | session = cls; | 686 | static struct SetEntry * |
439 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx); | 687 | lookup_set (struct ConsensusSession *session, struct SetKey *key) |
688 | { | ||
689 | struct GNUNET_HashCode hash; | ||
440 | 690 | ||
441 | if (tc != NULL) | 691 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
442 | session->round_timeout_tid = NULL; | 692 | "P%u: looking up set {%s}\n", |
693 | session->local_peer_idx, | ||
694 | debug_str_set_key (key)); | ||
443 | 695 | ||
444 | if (session->round_timeout_tid != NULL) | 696 | GNUNET_assert (SET_KIND_NONE != key->set_kind); |
445 | { | 697 | GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash); |
446 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 698 | return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash); |
447 | session->round_timeout_tid = NULL; | 699 | } |
448 | } | ||
449 | 700 | ||
450 | for (i = 0; i < session->num_peers; i++) | ||
451 | { | ||
452 | if (NULL != session->info[i].set_op) | ||
453 | { | ||
454 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n", | ||
455 | session->local_peer_idx, i); | ||
456 | GNUNET_SET_operation_cancel (session->info[i].set_op); | ||
457 | session->info[i].set_op = NULL; | ||
458 | } | ||
459 | /* we're in the new round, nothing finished yet */ | ||
460 | session->info[i].set_op_finished = GNUNET_NO; | ||
461 | } | ||
462 | 701 | ||
463 | switch (session->current_round) | 702 | static struct DiffEntry * |
464 | { | 703 | lookup_diff (struct ConsensusSession *session, struct DiffKey *key) |
465 | case CONSENSUS_ROUND_BEGIN: | 704 | { |
466 | session->current_round = CONSENSUS_ROUND_EXCHANGE; | 705 | struct GNUNET_HashCode hash; |
467 | session->exp_repetition = 0; | 706 | |
468 | subround_over (session, NULL); | 707 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
469 | break; | 708 | "P%u: looking up diff {%s}\n", |
470 | case CONSENSUS_ROUND_EXCHANGE: | 709 | session->local_peer_idx, |
471 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n", | 710 | debug_str_diff_key (key)); |
472 | session->local_peer_idx); | 711 | |
473 | session->current_round = CONSENSUS_ROUND_FINISH; | 712 | GNUNET_assert (DIFF_KIND_NONE != key->diff_kind); |
474 | res = GNUNET_SET_iterate (session->element_set, send_to_client_iter, session); | 713 | GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash); |
475 | if (GNUNET_SYSERR == res) | 714 | return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash); |
476 | { | ||
477 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: set invalid\n"); | ||
478 | } | ||
479 | else if (GNUNET_NO == res) | ||
480 | { | ||
481 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: iterator already active\n"); | ||
482 | } | ||
483 | break; | ||
484 | default: | ||
485 | GNUNET_assert (0); | ||
486 | } | ||
487 | } | 715 | } |
488 | 716 | ||
489 | 717 | ||
490 | /** | 718 | static struct ReferendumEntry * |
491 | * Create a new permutation for the session's peers in session->shuffle. | 719 | lookup_rfn (struct ConsensusSession *session, struct RfnKey *key) |
492 | * Uses a Fisher-Yates shuffle with pseudo-randomness coming from | ||
493 | * both the global session id and the current round index. | ||
494 | * | ||
495 | * @param session the session to create the new permutation for | ||
496 | */ | ||
497 | static void | ||
498 | shuffle (struct ConsensusSession *session) | ||
499 | { | 720 | { |
500 | uint32_t i; | 721 | struct GNUNET_HashCode hash; |
501 | uint32_t randomness[session->num_peers-1]; | ||
502 | |||
503 | if (NULL == session->shuffle) | ||
504 | session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle)); | ||
505 | if (NULL == session->shuffle_inv) | ||
506 | session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv)); | ||
507 | 722 | ||
508 | GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), | 723 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
509 | &session->exp_repetition, sizeof (uint32_t), | 724 | "P%u: looking up rfn {%s}\n", |
510 | &session->global_id, sizeof (struct GNUNET_HashCode), | 725 | session->local_peer_idx, |
511 | NULL); | 726 | debug_str_rfn_key (key)); |
512 | 727 | ||
513 | for (i = 0; i < session->num_peers; i++) | 728 | GNUNET_assert (RFN_KIND_NONE != key->rfn_kind); |
514 | session->shuffle[i] = i; | 729 | GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash); |
730 | return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash); | ||
731 | } | ||
515 | 732 | ||
516 | for (i = session->num_peers - 1; i > 0; i--) | ||
517 | { | ||
518 | uint32_t x; | ||
519 | uint32_t tmp; | ||
520 | x = randomness[i-1] % session->num_peers; | ||
521 | tmp = session->shuffle[x]; | ||
522 | session->shuffle[x] = session->shuffle[i]; | ||
523 | session->shuffle[i] = tmp; | ||
524 | } | ||
525 | 733 | ||
526 | /* create the inverse */ | 734 | static void |
527 | for (i = 0; i < session->num_peers; i++) | 735 | diff_insert (struct DiffEntry *diff, |
528 | session->shuffle_inv[session->shuffle[i]] = i; | 736 | int weight, |
737 | const struct GNUNET_SET_Element *element) | ||
738 | { | ||
739 | GNUNET_assert (0); | ||
529 | } | 740 | } |
530 | 741 | ||
531 | 742 | ||
532 | /** | ||
533 | * Find and set the partner_incoming and partner_outgoing of our peer, | ||
534 | * one of them may not exist (and thus set to NULL) if the number of peers | ||
535 | * in the session is not a power of two. | ||
536 | * | ||
537 | * @param session the consensus session | ||
538 | */ | ||
539 | static void | 743 | static void |
540 | find_partners (struct ConsensusSession *session) | 744 | rfn_vote (struct ReferendumEntry *rfn, |
541 | { | 745 | uint16_t voting_peer, |
542 | unsigned int arc; | 746 | uint16_t num_peers, |
543 | unsigned int num_ghosts; | 747 | int vote, |
544 | unsigned int largest_arc; | 748 | const struct GNUNET_SET_Element *element) |
545 | int partner_idx; | 749 | { |
546 | 750 | GNUNET_assert (voting_peer < num_peers); | |
547 | /* shuffled local index */ | 751 | GNUNET_assert (0); |
548 | int my_idx = session->shuffle[session->local_peer_idx]; | ||
549 | |||
550 | /* distance to neighboring peer in current subround */ | ||
551 | arc = 1 << session->exp_subround; | ||
552 | largest_arc = 1; | ||
553 | while (largest_arc < session->num_peers) | ||
554 | largest_arc <<= 1; | ||
555 | num_ghosts = largest_arc - session->num_peers; | ||
556 | // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc); | ||
557 | // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc); | ||
558 | // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts); | ||
559 | |||
560 | if (0 == (my_idx & arc)) | ||
561 | { | ||
562 | /* we are outgoing */ | ||
563 | partner_idx = (my_idx + arc) % session->num_peers; | ||
564 | session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]]; | ||
565 | GNUNET_assert (GNUNET_NO == session->partner_outgoing->set_op_finished); | ||
566 | /* are we a 'ghost' of a peer that would exist if | ||
567 | * the number of peers was a power of two, and thus have to partner | ||
568 | * with an additional peer? | ||
569 | */ | ||
570 | if (my_idx < num_ghosts) | ||
571 | { | ||
572 | int ghost_partner_idx; | ||
573 | // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers); | ||
574 | ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers; | ||
575 | /* platform dependent; modulo sometimes returns negative values */ | ||
576 | if (ghost_partner_idx < 0) | ||
577 | ghost_partner_idx += session->num_peers; | ||
578 | /* we only need to have a ghost partner if the partner is outgoing */ | ||
579 | if (0 == (ghost_partner_idx & arc)) | ||
580 | { | ||
581 | // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx); | ||
582 | session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]]; | ||
583 | GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished); | ||
584 | return; | ||
585 | } | ||
586 | } | ||
587 | session->partner_incoming = NULL; | ||
588 | return; | ||
589 | } | ||
590 | /* we only have an incoming connection */ | ||
591 | partner_idx = (my_idx - (int) arc) % (int) session->num_peers; | ||
592 | if (partner_idx < 0) | ||
593 | partner_idx += session->num_peers; | ||
594 | session->partner_outgoing = NULL; | ||
595 | session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]]; | ||
596 | GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished); | ||
597 | } | 752 | } |
598 | 753 | ||
754 | uint16_t | ||
755 | task_other_peer (struct TaskEntry *task) | ||
756 | { | ||
757 | uint16_t me = task->step->session->local_peer_idx; | ||
758 | if (task->key.peer1 == me) | ||
759 | return task->key.peer2; | ||
760 | return task->key.peer1; | ||
761 | } | ||
599 | 762 | ||
600 | /** | 763 | /** |
601 | * Callback for set operation results. Called for each element | 764 | * Callback for set operation results. Called for each element |
@@ -610,267 +773,775 @@ set_result_cb (void *cls, | |||
610 | const struct GNUNET_SET_Element *element, | 773 | const struct GNUNET_SET_Element *element, |
611 | enum GNUNET_SET_Status status) | 774 | enum GNUNET_SET_Status status) |
612 | { | 775 | { |
613 | struct ConsensusPeerInformation *cpi = cls; | 776 | struct TaskEntry *task = cls; |
614 | unsigned int remote_idx = cpi - cpi->session->info; | 777 | struct ConsensusSession *session = task->step->session; |
615 | unsigned int local_idx = cpi->session->local_peer_idx; | 778 | struct SetEntry *output_set = NULL; |
779 | struct DiffEntry *output_diff = NULL; | ||
780 | struct ReferendumEntry *output_rfn = NULL; | ||
781 | unsigned int other_idx; | ||
782 | |||
783 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
784 | "P%u: got set result for {%s}, status %u\n", | ||
785 | session->local_peer_idx, | ||
786 | debug_str_task_key (&task->key), | ||
787 | status); | ||
788 | |||
789 | if (GNUNET_NO == task->is_running) | ||
790 | { | ||
791 | GNUNET_break_op (0); | ||
792 | return; | ||
793 | } | ||
616 | 794 | ||
617 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u with status %u\n", | 795 | if (GNUNET_YES == task->is_finished) |
618 | local_idx, remote_idx, (unsigned int) status); | 796 | { |
797 | GNUNET_break_op (0); | ||
798 | return; | ||
799 | } | ||
800 | |||
801 | if (task->key.peer1 == session->local_peer_idx) | ||
802 | other_idx = task->key.peer2; | ||
803 | else if (task->key.peer2 == session->local_peer_idx) | ||
804 | other_idx = task->key.peer1; | ||
805 | else | ||
806 | { | ||
807 | /* error in task graph construction */ | ||
808 | GNUNET_assert (0); | ||
809 | } | ||
619 | 810 | ||
620 | GNUNET_assert ((cpi == cpi->session->partner_outgoing) || | 811 | if (SET_KIND_NONE != task->output_set.set_kind) |
621 | (cpi == cpi->session->partner_incoming)); | 812 | output_set = lookup_set (session, &task->output_set); |
813 | |||
814 | if (DIFF_KIND_NONE != task->output_diff.diff_kind) | ||
815 | output_diff = lookup_diff (session, &task->output_diff); | ||
816 | |||
817 | if (RFN_KIND_NONE != task->output_rfn.rfn_kind) | ||
818 | output_rfn = lookup_rfn (session, &task->output_rfn); | ||
819 | |||
820 | if (GNUNET_YES == session->peers_ignored[other_idx]) | ||
821 | { | ||
822 | /* We should have never started or commited to an operation | ||
823 | with an ignored peer. */ | ||
824 | GNUNET_break (0); | ||
825 | return; | ||
826 | } | ||
622 | 827 | ||
623 | switch (status) | 828 | switch (status) |
624 | { | 829 | { |
830 | // case GNUNET_SET_STATUS_MISSING_LOCAL: | ||
625 | case GNUNET_SET_STATUS_OK: | 831 | case GNUNET_SET_STATUS_OK: |
626 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n", | 832 | if (NULL != output_set) |
627 | local_idx, remote_idx); | ||
628 | break; | ||
629 | case GNUNET_SET_STATUS_FAILURE: | ||
630 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n", | ||
631 | local_idx, remote_idx); | ||
632 | cpi->set_op = NULL; | ||
633 | return; | ||
634 | case GNUNET_SET_STATUS_HALF_DONE: | ||
635 | case GNUNET_SET_STATUS_DONE: | ||
636 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n", | ||
637 | local_idx, remote_idx); | ||
638 | cpi->set_op_finished = GNUNET_YES; | ||
639 | cpi->set_op = NULL; | ||
640 | if (have_exp_subround_finished (cpi->session) == GNUNET_YES) | ||
641 | { | 833 | { |
642 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n", | 834 | // FIXME: record pending adds, use callback |
643 | local_idx); | 835 | GNUNET_SET_add_element (output_set->h, |
644 | subround_over (cpi->session, NULL); | 836 | element, |
837 | NULL, | ||
838 | NULL); | ||
839 | |||
645 | } | 840 | } |
646 | else | 841 | if (NULL != output_diff) |
647 | { | 842 | { |
648 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n", | 843 | diff_insert (output_diff, 1, element); |
649 | local_idx); | ||
650 | } | 844 | } |
651 | return; | 845 | if (NULL != output_rfn) |
652 | default: | 846 | { |
653 | GNUNET_break (0); | 847 | rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element); |
654 | return; | 848 | } |
655 | } | 849 | // XXX: add result to structures in task |
656 | |||
657 | switch (cpi->session->current_round) | ||
658 | { | ||
659 | case CONSENSUS_ROUND_COMPLETION: | ||
660 | case CONSENSUS_ROUND_EXCHANGE: | ||
661 | GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL); | ||
662 | break; | 850 | break; |
663 | default: | 851 | //case GNUNET_SET_STATUS_MISSING_REMOTE: |
852 | // // XXX: add result to structures in task | ||
853 | // break; | ||
854 | case GNUNET_SET_STATUS_DONE: | ||
855 | // XXX: check first if any changes to the underlying | ||
856 | // set are still pending | ||
857 | // XXX: commit other peer in referendum | ||
858 | finish_task (task); | ||
859 | break; | ||
860 | case GNUNET_SET_STATUS_FAILURE: | ||
861 | // XXX: cleanup | ||
664 | GNUNET_break (0); | 862 | GNUNET_break (0); |
665 | return; | 863 | return; |
864 | default: | ||
865 | /* not reached */ | ||
866 | GNUNET_assert (0); | ||
666 | } | 867 | } |
667 | } | 868 | } |
668 | 869 | ||
669 | 870 | ||
871 | |||
670 | /** | 872 | /** |
671 | * Compare the round the session is in with the round of the given context message. | 873 | * Commit the appropriate set for a |
672 | * | 874 | * task. |
673 | * @param session a consensus session | ||
674 | * @param ri a round context message | ||
675 | * @return 0 if it's the same round, -1 if the session is in an earlier round, | ||
676 | * 1 if the session is in a later round | ||
677 | */ | 875 | */ |
678 | static int | 876 | static void |
679 | rounds_compare (struct ConsensusSession *session, | 877 | commit_set (struct ConsensusSession *session, |
680 | struct RoundInfo* ri) | 878 | struct TaskEntry *task) |
681 | { | 879 | { |
682 | if (session->current_round < ri->round) | 880 | struct SetEntry *set; |
683 | return -1; | 881 | |
684 | if (session->current_round > ri->round) | 882 | GNUNET_assert (NULL != task->op); |
685 | return 1; | 883 | set = lookup_set (session, &task->input_set); |
686 | if (session->current_round == CONSENSUS_ROUND_EXCHANGE) | 884 | GNUNET_assert (NULL != set); |
687 | { | 885 | GNUNET_SET_commit (task->op, set->h); |
688 | if (session->exp_repetition < ri->exp_repetition) | ||
689 | return -1; | ||
690 | if (session->exp_repetition > ri->exp_repetition) | ||
691 | return 1; | ||
692 | if (session->exp_subround < ri->exp_subround) | ||
693 | return -1; | ||
694 | if (session->exp_subround > ri->exp_subround) | ||
695 | return 1; | ||
696 | return 0; | ||
697 | } | ||
698 | /* other rounds have no subrounds / repetitions to compare */ | ||
699 | return 0; | ||
700 | } | 886 | } |
701 | 887 | ||
702 | 888 | ||
703 | /** | ||
704 | * Do the next subround in the exp-scheme. | ||
705 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | ||
706 | * | ||
707 | * @param cls the session | ||
708 | * @param tc task context, for when this task is invoked by the scheduler, | ||
709 | * NULL if invoked for another reason | ||
710 | */ | ||
711 | static void | 889 | static void |
712 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 890 | put_diff (struct ConsensusSession *session, |
891 | struct DiffEntry *diff) | ||
713 | { | 892 | { |
714 | struct ConsensusSession *session; | 893 | struct GNUNET_HashCode hash; |
715 | struct GNUNET_TIME_Relative subround_timeout; | ||
716 | int i; | ||
717 | 894 | ||
718 | /* don't kick off next subround if we're shutting down */ | 895 | GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash); |
719 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | 896 | GNUNET_assert (GNUNET_OK == |
720 | return; | 897 | GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff, |
898 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
899 | } | ||
900 | |||
901 | static void | ||
902 | put_set (struct ConsensusSession *session, | ||
903 | struct SetEntry *set) | ||
904 | { | ||
905 | struct GNUNET_HashCode hash; | ||
721 | 906 | ||
722 | session = cls; | 907 | GNUNET_assert (NULL != set->h); |
908 | |||
909 | GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash); | ||
910 | GNUNET_assert (GNUNET_OK == | ||
911 | GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set, | ||
912 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
913 | } | ||
914 | |||
915 | |||
916 | static void | ||
917 | put_rfn (struct ConsensusSession *session, | ||
918 | struct ReferendumEntry *rfn) | ||
919 | { | ||
920 | struct GNUNET_HashCode hash; | ||
921 | |||
922 | GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash); | ||
923 | GNUNET_assert (GNUNET_OK == | ||
924 | GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn, | ||
925 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
926 | } | ||
927 | |||
928 | |||
929 | |||
930 | static void | ||
931 | output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy) | ||
932 | { | ||
933 | struct TaskEntry *task = (struct TaskEntry *) cls; | ||
934 | struct ConsensusSession *session = task->step->session; | ||
935 | struct SetEntry *set = GNUNET_new (struct SetEntry); | ||
936 | |||
937 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
938 | "P%u: Received lazy copy, storing output set %s\n", | ||
939 | session->local_peer_idx, debug_str_set_key (&task->output_set)); | ||
940 | |||
941 | set->key = task->output_set; | ||
942 | set->h = copy; | ||
943 | put_set (task->step->session, set); | ||
944 | run_task_remote_union (task->step->session, task); | ||
945 | } | ||
946 | |||
947 | |||
948 | static void | ||
949 | run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task) | ||
950 | { | ||
951 | struct SetEntry *input; | ||
723 | 952 | ||
724 | GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round); | 953 | input = lookup_set (session, &task->input_set); |
954 | GNUNET_assert (NULL != input); | ||
955 | GNUNET_assert (NULL != input->h); | ||
725 | 956 | ||
726 | if (tc != NULL) | 957 | /* We create the outputs for the operation here |
958 | (rather than in the set operation callback) | ||
959 | because we want something valid in there, even | ||
960 | if the other peer doesn't talk to us */ | ||
961 | |||
962 | if (SET_KIND_NONE != task->output_set.set_kind) | ||
727 | { | 963 | { |
728 | session->round_timeout_tid = NULL; | 964 | /* If we don't have an existing output set, |
729 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "P%u: consensus subround timed out\n", | 965 | we clone the input set. */ |
730 | session->local_peer_idx); | 966 | if (NULL == lookup_set (session, &task->output_set)) |
967 | { | ||
968 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
969 | "Output set missing, copying from input set\n"); | ||
970 | /* Since the cloning is asynchronous, | ||
971 | we'll retry the current function once the copy | ||
972 | has been provided by the SET service. */ | ||
973 | GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task); | ||
974 | return; | ||
975 | } | ||
731 | } | 976 | } |
732 | 977 | ||
733 | /* cancel timeout */ | 978 | if (RFN_KIND_NONE != task->output_rfn.rfn_kind) |
734 | if (session->round_timeout_tid != NULL) | ||
735 | { | 979 | { |
736 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 980 | if (NULL == lookup_rfn (session, &task->output_rfn)) |
737 | session->round_timeout_tid = NULL; | 981 | { |
982 | struct ReferendumEntry *rfn; | ||
983 | |||
984 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
985 | "P%u: output rfn <%s> missing, creating.\n", | ||
986 | session->local_peer_idx, | ||
987 | debug_str_rfn_key (&task->output_rfn)); | ||
988 | |||
989 | rfn = GNUNET_new (struct ReferendumEntry); | ||
990 | rfn->key = task->output_rfn; | ||
991 | rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO); | ||
992 | rfn->peer_commited = GNUNET_new_array (session->num_peers, int); | ||
993 | put_rfn (session, rfn); | ||
994 | } | ||
738 | } | 995 | } |
739 | 996 | ||
740 | for (i = 0; i < session->num_peers; i++) | 997 | if (task->key.peer1 == session->local_peer_idx) |
741 | { | 998 | { |
742 | if (NULL != session->info[i].set_op) | 999 | struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 }; |
1000 | |||
1001 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1002 | "P%u: Looking up set {%s} to run remote union\n", | ||
1003 | session->local_peer_idx, | ||
1004 | debug_str_set_key (&task->input_set)); | ||
1005 | |||
1006 | rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); | ||
1007 | rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage)); | ||
1008 | |||
1009 | rcm.kind = htons (task->key.kind); | ||
1010 | rcm.peer1 = htons (task->key.peer1); | ||
1011 | rcm.peer2 = htons (task->key.peer2); | ||
1012 | rcm.leader = htons (task->key.leader); | ||
1013 | rcm.repetition = htons (task->key.repetition); | ||
1014 | |||
1015 | GNUNET_assert (NULL == task->op); | ||
1016 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n", | ||
1017 | session->local_peer_idx, task->key.peer2, debug_str_set_key (&task->input_set)); | ||
1018 | |||
1019 | // XXX: maybe this should be done while | ||
1020 | // setting up tasks alreays? | ||
1021 | task->op = GNUNET_SET_prepare (&session->peers[task->key.peer2], | ||
1022 | &session->global_id, | ||
1023 | &rcm.header, | ||
1024 | GNUNET_SET_RESULT_ADDED, /* XXX: will be obsolete soon */ | ||
1025 | set_result_cb, | ||
1026 | task); | ||
1027 | |||
1028 | /* Referendums must be materialized as a set before */ | ||
1029 | GNUNET_assert (RFN_KIND_NONE == task->input_rfn.rfn_kind); | ||
1030 | |||
1031 | if (GNUNET_OK != GNUNET_SET_commit (task->op, input->h)) | ||
743 | { | 1032 | { |
744 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n", | 1033 | GNUNET_break (0); |
745 | session->local_peer_idx, i); | 1034 | /* XXX: cleanup? */ |
746 | GNUNET_SET_operation_cancel (session->info[i].set_op); | 1035 | return; |
747 | session->info[i].set_op = NULL; | ||
748 | } | 1036 | } |
749 | /* we're in the new round, nothing finished yet */ | ||
750 | session->info[i].set_op_finished = GNUNET_NO; | ||
751 | } | 1037 | } |
1038 | else if (task->key.peer2 == session->local_peer_idx) | ||
1039 | { | ||
1040 | /* Wait for the other peer to contact us */ | ||
1041 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n", | ||
1042 | session->local_peer_idx, task->key.peer1); | ||
752 | 1043 | ||
753 | if (session->exp_repetition >= NUM_EXP_REPETITIONS) | 1044 | if (NULL != task->op) |
1045 | { | ||
1046 | GNUNET_assert (NULL == task->commited_set); | ||
1047 | commit_set (session, task); | ||
1048 | } | ||
1049 | } | ||
1050 | else | ||
754 | { | 1051 | { |
755 | round_over (session, NULL); | 1052 | /* We made an error while constructing the task graph. */ |
756 | return; | 1053 | GNUNET_assert (0); |
1054 | } | ||
1055 | } | ||
1056 | |||
1057 | |||
1058 | static int | ||
1059 | rfn_majority (uint16_t num_peers, | ||
1060 | struct ReferendumEntry *rfn, | ||
1061 | struct RfnElementInfo *ri, | ||
1062 | uint16_t threshold) | ||
1063 | { | ||
1064 | unsigned int votes_add = 0; | ||
1065 | unsigned int votes_remove = 0; | ||
1066 | unsigned int num_commited = 0; | ||
1067 | unsigned int maj_thresh; | ||
1068 | unsigned int nv; | ||
1069 | unsigned int tv; | ||
1070 | unsigned int i; | ||
1071 | |||
1072 | for (i = 0; i < num_peers; i++) | ||
1073 | { | ||
1074 | if (GNUNET_NO == rfn->peer_commited[i]) | ||
1075 | continue; | ||
1076 | num_commited++; | ||
1077 | if (ri->votes[i] == VOTE_ADD) | ||
1078 | votes_add++; | ||
1079 | if (ri->votes[i] == VOTE_REMOVE) | ||
1080 | votes_remove++; | ||
757 | } | 1081 | } |
758 | 1082 | ||
759 | if (session->exp_repetition == 0) | 1083 | /* Threshold to reach a majority among |
1084 | submitted votes, may not be enough for the | ||
1085 | global threshold. */ | ||
1086 | maj_thresh = (num_commited + 1) / 2; | ||
1087 | /* Vote are relative to our local set, so it can only be | ||
1088 | either all add or all remove */ | ||
1089 | GNUNET_assert ( (0 == votes_add) || (0 == votes_remove) ); | ||
1090 | |||
1091 | if (votes_add > 0) | ||
760 | { | 1092 | { |
761 | /* initialize everything for the log-rounds */ | 1093 | nv = votes_add; |
762 | session->exp_repetition = 1; | 1094 | tv = VOTE_ADD; |
763 | session->exp_subround = 0; | ||
764 | if (NULL == session->shuffle) | ||
765 | session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers); | ||
766 | if (NULL == session->shuffle_inv) | ||
767 | session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers); | ||
768 | for (i = 0; i < session->num_peers; i++) | ||
769 | session->shuffle[i] = session->shuffle_inv[i] = i; | ||
770 | } | 1095 | } |
771 | else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) | 1096 | else if (votes_remove > 0) |
772 | { | 1097 | { |
773 | /* subrounds done, start new log-round */ | 1098 | nv = votes_remove; |
774 | session->exp_repetition++; | 1099 | tv = VOTE_REMOVE; |
775 | session->exp_subround = 0; | ||
776 | shuffle (session); | ||
777 | } | 1100 | } |
778 | else | 1101 | else |
779 | { | 1102 | { |
780 | session->exp_subround++; | 1103 | nv = 0; |
1104 | tv = VOTE_NONE; | ||
781 | } | 1105 | } |
782 | 1106 | ||
783 | subround_timeout = | 1107 | if ( (nv >= maj_thresh) && (nv >= threshold) ) |
784 | GNUNET_TIME_relative_divide (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline), | 1108 | return tv; |
785 | 2 * NUM_EXP_REPETITIONS * ((int) ceil (log2 (session->num_peers)))); | ||
786 | 1109 | ||
787 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "subround timeout: %u ms\n", subround_timeout.rel_value_us / 1000); | 1110 | if ( ((num_commited - nv) >= maj_thresh) && ((num_commited - nv) >= threshold) ) |
1111 | return VOTE_NONE; | ||
788 | 1112 | ||
789 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (subround_timeout, subround_over, session); | 1113 | return VOTE_CONTESTED; |
1114 | } | ||
790 | 1115 | ||
791 | /* determine the incoming and outgoing partner */ | ||
792 | find_partners (session); | ||
793 | 1116 | ||
794 | GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]); | 1117 | struct SetChangeProgressCls |
795 | GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]); | 1118 | { |
1119 | int num_pending; | ||
1120 | struct TaskEntry *task; | ||
1121 | }; | ||
1122 | |||
1123 | |||
1124 | static void | ||
1125 | eval_rfn_done (struct TaskEntry *task) | ||
1126 | { | ||
1127 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1128 | "P%u: EVAL_REFERENDUM done for task {%s}\n", | ||
1129 | task->step->session->local_peer_idx, debug_str_task_key (&task->key)); | ||
1130 | |||
1131 | finish_task (task); | ||
1132 | } | ||
796 | 1133 | ||
797 | /* initiate set operation with the outgoing partner */ | 1134 | |
798 | if (NULL != session->partner_outgoing) | 1135 | static void |
1136 | eval_rfn_progress (void *cls) | ||
1137 | { | ||
1138 | struct SetChangeProgressCls *erc = cls; | ||
1139 | |||
1140 | GNUNET_assert (erc->num_pending > 0); | ||
1141 | |||
1142 | erc->num_pending--; | ||
1143 | |||
1144 | if (0 == erc->num_pending) | ||
799 | { | 1145 | { |
800 | struct GNUNET_CONSENSUS_RoundContextMessage *msg; | 1146 | struct TaskEntry *task = erc->task; |
801 | msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage); | 1147 | GNUNET_free (erc); |
802 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); | 1148 | eval_rfn_done (task); |
803 | msg->header.size = htons (sizeof *msg); | 1149 | } |
804 | msg->round = htonl (session->current_round); | 1150 | } |
805 | msg->exp_repetition = htonl (session->exp_repetition); | 1151 | |
806 | msg->exp_subround = htonl (session->exp_subround); | 1152 | |
1153 | static void | ||
1154 | eval_rfn_copy_cb (void *cls, struct GNUNET_SET_Handle *copy) | ||
1155 | { | ||
1156 | struct TaskEntry *task = (struct TaskEntry *) cls; | ||
1157 | struct ConsensusSession *session = task->step->session; | ||
1158 | struct SetEntry *set; | ||
807 | 1159 | ||
808 | if (NULL != session->partner_outgoing->set_op) | 1160 | set = GNUNET_new (struct SetEntry); |
1161 | set->h = copy; | ||
1162 | set->key = task->output_set; | ||
1163 | |||
1164 | put_set (session, set); | ||
1165 | |||
1166 | run_task_eval_rfn (session, task); | ||
1167 | } | ||
1168 | |||
1169 | |||
1170 | /** | ||
1171 | * Take an input set and an input referendum, | ||
1172 | * apply the referendum with a threshold to the input | ||
1173 | * set and store the result in the output set and/or output diff. | ||
1174 | */ | ||
1175 | static void | ||
1176 | run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task) | ||
1177 | { | ||
1178 | struct GNUNET_CONTAINER_MultiHashMapIterator *iter; | ||
1179 | struct ReferendumEntry *input_rfn; | ||
1180 | struct RfnElementInfo *ri; | ||
1181 | struct SetEntry *output_set = NULL; | ||
1182 | struct DiffEntry *output_diff = NULL; | ||
1183 | struct SetChangeProgressCls *progress_cls; | ||
1184 | |||
1185 | /* Have at least one output */ | ||
1186 | GNUNET_assert ( (task->output_set.set_kind != SET_KIND_NONE) || | ||
1187 | (task->output_diff.diff_kind != DIFF_KIND_NONE)); | ||
1188 | |||
1189 | /* Not allowed as output */ | ||
1190 | GNUNET_assert ( (task->output_rfn.rfn_kind == RFN_KIND_NONE)); | ||
1191 | |||
1192 | if (SET_KIND_NONE != task->output_set.set_kind) | ||
1193 | { | ||
1194 | /* We have a set output, thus the output set must | ||
1195 | exist or copy it from the input set */ | ||
1196 | output_set = lookup_set (session, &task->output_set); | ||
1197 | if (NULL == output_set) | ||
809 | { | 1198 | { |
810 | GNUNET_break (0); | 1199 | struct SetEntry *input_set; |
811 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); | 1200 | |
1201 | input_set = lookup_set (session, &task->input_set); | ||
1202 | GNUNET_assert (NULL != input_set); | ||
1203 | GNUNET_SET_copy_lazy (input_set->h, | ||
1204 | eval_rfn_copy_cb, | ||
1205 | task); | ||
1206 | /* We'll be called again, this time with the | ||
1207 | set ready. */ | ||
1208 | return; | ||
812 | } | 1209 | } |
813 | session->partner_outgoing->set_op = | 1210 | } |
814 | GNUNET_SET_prepare (&session->partner_outgoing->peer_id, | 1211 | |
815 | &session->global_id, | 1212 | if (DIFF_KIND_NONE != task->output_diff.diff_kind) |
816 | (struct GNUNET_MessageHeader *) msg, | 1213 | { |
817 | GNUNET_SET_RESULT_ADDED, | 1214 | output_diff = lookup_diff (session, &task->output_diff); |
818 | set_result_cb, session->partner_outgoing); | 1215 | if (NULL == output_diff) |
819 | GNUNET_free (msg); | ||
820 | if (GNUNET_OK != GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set)) | ||
821 | { | 1216 | { |
822 | GNUNET_break (0); | 1217 | output_diff = GNUNET_new (struct DiffEntry); |
823 | session->partner_outgoing->set_op = NULL; | 1218 | output_diff->key = task->output_diff; |
824 | session->partner_outgoing->set_op_finished = GNUNET_YES; | 1219 | output_diff->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO); |
1220 | put_diff (session, output_diff); | ||
825 | } | 1221 | } |
826 | } | 1222 | } |
827 | 1223 | ||
828 | /* commit to the delayed set operation */ | 1224 | progress_cls = GNUNET_new (struct SetChangeProgressCls); |
829 | if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op)) | 1225 | |
830 | { | 1226 | input_rfn = lookup_rfn (session, &task->input_rfn); |
831 | int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info); | 1227 | |
1228 | GNUNET_assert (NULL != input_rfn); | ||
832 | 1229 | ||
833 | if (NULL != session->partner_incoming->set_op) | 1230 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); |
1231 | GNUNET_assert (NULL != iter); | ||
1232 | |||
1233 | while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) | ||
1234 | { | ||
1235 | int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, task->threshold); | ||
1236 | switch (majority_vote) | ||
834 | { | 1237 | { |
835 | GNUNET_break (0); | 1238 | case VOTE_ADD: |
836 | GNUNET_SET_operation_cancel (session->partner_incoming->set_op); | 1239 | if (NULL != output_set) |
837 | session->partner_incoming->set_op = NULL; | 1240 | { |
1241 | progress_cls->num_pending++; | ||
1242 | GNUNET_assert (GNUNET_OK == | ||
1243 | GNUNET_SET_add_element (output_set->h, | ||
1244 | ri->element, | ||
1245 | eval_rfn_progress, | ||
1246 | progress_cls)); | ||
1247 | } | ||
1248 | if (NULL != output_diff) | ||
1249 | { | ||
1250 | diff_insert (output_diff, 1, ri->element); | ||
1251 | } | ||
1252 | break; | ||
1253 | case VOTE_CONTESTED: | ||
1254 | if (NULL != output_set) | ||
1255 | output_set->is_contested = GNUNET_YES; | ||
1256 | /* fallthrough */ | ||
1257 | case VOTE_REMOVE: | ||
1258 | if (NULL != output_set) | ||
1259 | { | ||
1260 | progress_cls->num_pending++; | ||
1261 | GNUNET_assert (GNUNET_OK == | ||
1262 | GNUNET_SET_remove_element (output_set->h, | ||
1263 | ri->element, | ||
1264 | eval_rfn_progress, | ||
1265 | progress_cls)); | ||
1266 | } | ||
1267 | if (NULL != output_diff) | ||
1268 | { | ||
1269 | diff_insert (output_diff, -1, ri->element); | ||
1270 | } | ||
1271 | break; | ||
1272 | case VOTE_NONE: | ||
1273 | /* Nothing to do. */ | ||
1274 | break; | ||
1275 | default: | ||
1276 | /* not reached */ | ||
1277 | GNUNET_assert (0); | ||
838 | } | 1278 | } |
839 | if (cmp == 0) | 1279 | } |
1280 | GNUNET_CONTAINER_multihashmap_iterator_destroy (iter); | ||
1281 | |||
1282 | if (progress_cls->num_pending == 0) | ||
1283 | { | ||
1284 | // call closure right now, no pending ops | ||
1285 | GNUNET_free (progress_cls); | ||
1286 | eval_rfn_done (task); | ||
1287 | } | ||
1288 | } | ||
1289 | |||
1290 | |||
1291 | static void | ||
1292 | apply_diff_copy_cb (void *cls, struct GNUNET_SET_Handle *copy) | ||
1293 | { | ||
1294 | struct TaskEntry *task = (struct TaskEntry *) cls; | ||
1295 | struct ConsensusSession *session = task->step->session; | ||
1296 | struct SetEntry *set; | ||
1297 | |||
1298 | set = GNUNET_new (struct SetEntry); | ||
1299 | set->h = copy; | ||
1300 | set->key = task->output_set; | ||
1301 | |||
1302 | put_set (session, set); | ||
1303 | |||
1304 | run_task_apply_diff (session, task); | ||
1305 | } | ||
1306 | |||
1307 | |||
1308 | static void | ||
1309 | apply_diff_done (struct TaskEntry *task) | ||
1310 | { | ||
1311 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1312 | "P%u: APPLY_DIFF done for task {%s}\n", | ||
1313 | task->step->session->local_peer_idx, debug_str_task_key (&task->key)); | ||
1314 | finish_task (task); | ||
1315 | } | ||
1316 | |||
1317 | |||
1318 | static void | ||
1319 | apply_diff_progress (void *cls) | ||
1320 | { | ||
1321 | struct SetChangeProgressCls *erc = cls; | ||
1322 | |||
1323 | GNUNET_assert (erc->num_pending > 0); | ||
1324 | |||
1325 | erc->num_pending--; | ||
1326 | |||
1327 | if (0 == erc->num_pending) | ||
1328 | { | ||
1329 | struct TaskEntry *task = erc->task; | ||
1330 | GNUNET_free (erc); | ||
1331 | apply_diff_done (task); | ||
1332 | } | ||
1333 | } | ||
1334 | |||
1335 | |||
1336 | static void | ||
1337 | run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task) | ||
1338 | { | ||
1339 | struct SetEntry *output_set; | ||
1340 | struct DiffEntry *input_diff; | ||
1341 | struct GNUNET_CONTAINER_MultiHashMapIterator *iter; | ||
1342 | struct DiffElementInfo *di; | ||
1343 | struct SetChangeProgressCls *progress_cls; | ||
1344 | |||
1345 | GNUNET_assert (task->output_set.set_kind != SET_KIND_NONE); | ||
1346 | GNUNET_assert (task->input_diff.diff_kind != DIFF_KIND_NONE); | ||
1347 | |||
1348 | input_diff = lookup_diff (session, &task->input_diff); | ||
1349 | |||
1350 | GNUNET_assert (NULL != input_diff); | ||
1351 | |||
1352 | output_set = lookup_set (session, &task->output_set); | ||
1353 | |||
1354 | if (NULL == output_set) | ||
1355 | { | ||
1356 | struct SetEntry *input_set; | ||
1357 | |||
1358 | input_set = lookup_set (session, &task->input_set); | ||
1359 | GNUNET_assert (NULL != input_set); | ||
1360 | GNUNET_SET_copy_lazy (input_set->h, | ||
1361 | apply_diff_copy_cb, | ||
1362 | task); | ||
1363 | /* We'll be called again, this time with the | ||
1364 | set ready. */ | ||
1365 | return; | ||
1366 | } | ||
1367 | |||
1368 | progress_cls = GNUNET_new (struct SetChangeProgressCls); | ||
1369 | |||
1370 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_diff->changes); | ||
1371 | |||
1372 | while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di)) | ||
1373 | { | ||
1374 | if (di->weight > 0) | ||
840 | { | 1375 | { |
841 | if (GNUNET_OK != GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set)) | 1376 | progress_cls->num_pending++; |
842 | { | 1377 | GNUNET_assert (GNUNET_OK == |
843 | GNUNET_break (0); | 1378 | GNUNET_SET_remove_element (output_set->h, |
844 | } | 1379 | di->element, |
845 | session->partner_incoming->set_op = session->partner_incoming->delayed_set_op; | 1380 | apply_diff_progress, |
846 | session->partner_incoming->delayed_set_op = NULL; | 1381 | progress_cls)); |
847 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n", | ||
848 | session->local_peer_idx, (int) (session->partner_incoming - session->info)); | ||
849 | } | 1382 | } |
850 | else | 1383 | else if (di->weight < 0) |
851 | { | 1384 | { |
852 | /* this should not happen -- a round has been skipped! */ | 1385 | progress_cls->num_pending++; |
853 | GNUNET_break_op (0); | 1386 | GNUNET_assert (GNUNET_OK == |
1387 | GNUNET_SET_add_element (output_set->h, | ||
1388 | di->element, | ||
1389 | apply_diff_progress, | ||
1390 | progress_cls)); | ||
854 | } | 1391 | } |
855 | } | 1392 | } |
856 | 1393 | ||
1394 | GNUNET_CONTAINER_multihashmap_iterator_destroy (iter); | ||
1395 | |||
1396 | if (progress_cls->num_pending == 0) | ||
1397 | { | ||
1398 | // call closure right now, no pending ops | ||
1399 | GNUNET_free (progress_cls); | ||
1400 | apply_diff_done (task); | ||
1401 | } | ||
1402 | } | ||
1403 | |||
1404 | |||
1405 | static void | ||
1406 | run_task_finish (struct ConsensusSession *session, struct TaskEntry *task) | ||
1407 | { | ||
1408 | struct SetEntry *final_set; | ||
1409 | |||
1410 | final_set = lookup_set (session, &task->input_set); | ||
1411 | |||
1412 | GNUNET_assert (NULL != final_set); | ||
1413 | |||
1414 | |||
1415 | GNUNET_SET_iterate (final_set->h, | ||
1416 | send_to_client_iter, | ||
1417 | task); | ||
1418 | } | ||
1419 | |||
1420 | static void | ||
1421 | run_task (struct ConsensusSession *session, struct TaskEntry *task) | ||
1422 | { | ||
1423 | GNUNET_assert (GNUNET_NO == task->is_running); | ||
1424 | GNUNET_assert (GNUNET_NO == task->is_finished); | ||
1425 | |||
1426 | |||
1427 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key)); | ||
1428 | |||
1429 | switch (task->action) | ||
1430 | { | ||
1431 | case ACTION_RECONCILE: | ||
1432 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_RECONCILE task\n", session->local_peer_idx); | ||
1433 | run_task_remote_union (session, task); | ||
1434 | break; | ||
1435 | case ACTION_EVAL_RFN: | ||
1436 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_EVAL_RFN task\n", session->local_peer_idx); | ||
1437 | run_task_eval_rfn (session, task); | ||
1438 | break; | ||
1439 | case ACTION_APPLY_DIFF: | ||
1440 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_APPLY_DIFF task\n", session->local_peer_idx); | ||
1441 | run_task_apply_diff (session, task); | ||
1442 | break; | ||
1443 | case ACTION_FINISH: | ||
1444 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_FINISH task\n", session->local_peer_idx); | ||
1445 | run_task_finish (session, task); | ||
1446 | break; | ||
1447 | default: | ||
1448 | /* not reached */ | ||
1449 | GNUNET_assert (0); | ||
1450 | } | ||
1451 | task->is_running = GNUNET_YES; | ||
1452 | } | ||
1453 | |||
1454 | |||
1455 | static void finish_step (struct Step *step) | ||
1456 | { | ||
1457 | unsigned int i; | ||
1458 | |||
1459 | GNUNET_assert (step->finished_tasks == step->tasks_len); | ||
1460 | GNUNET_assert (GNUNET_YES == step->is_running); | ||
1461 | GNUNET_assert (GNUNET_NO == step->is_finished); | ||
1462 | |||
857 | #ifdef GNUNET_EXTRA_LOGGING | 1463 | #ifdef GNUNET_EXTRA_LOGGING |
1464 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1465 | "All tasks of step `%s' with %u subordinates finished.\n", | ||
1466 | step->debug_name, | ||
1467 | step->subordinates_len); | ||
1468 | #endif | ||
1469 | |||
1470 | for (i = 0; i < step->subordinates_len; i++) | ||
858 | { | 1471 | { |
859 | int in; | 1472 | GNUNET_assert (step->subordinates[i]->pending_prereq > 0); |
860 | int out; | 1473 | step->subordinates[i]->pending_prereq--; |
861 | if (session->partner_outgoing == NULL) | 1474 | #ifdef GNUNET_EXTRA_LOGGING |
862 | out = -1; | 1475 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
863 | else | 1476 | "Decreased pending_prereq to %u for step `%s'.\n", |
864 | out = (int) (session->partner_outgoing - session->info); | 1477 | step->subordinates[i]->pending_prereq, |
865 | if (session->partner_incoming == NULL) | 1478 | step->subordinates[i]->debug_name); |
866 | in = -1; | 1479 | |
867 | else | 1480 | #endif |
868 | in = (int) (session->partner_incoming - session->info); | 1481 | } |
869 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, | 1482 | |
870 | session->exp_repetition, session->exp_subround, in, out); | 1483 | step->is_finished = GNUNET_YES; |
871 | } | ||
872 | #endif /* GNUNET_EXTRA_LOGGING */ | ||
873 | 1484 | ||
1485 | // XXX: maybe schedule as task to avoid recursion? | ||
1486 | run_ready_steps (step->session); | ||
1487 | } | ||
1488 | |||
1489 | |||
1490 | /* | ||
1491 | * Run all steps of the session that don't any | ||
1492 | * more dependencies. | ||
1493 | */ | ||
1494 | static void | ||
1495 | run_ready_steps (struct ConsensusSession *session) | ||
1496 | { | ||
1497 | struct Step *step; | ||
1498 | |||
1499 | step = session->steps_head; | ||
1500 | |||
1501 | while (NULL != step) | ||
1502 | { | ||
1503 | if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) ) | ||
1504 | { | ||
1505 | size_t i; | ||
1506 | |||
1507 | GNUNET_assert (0 == step->finished_tasks); | ||
1508 | |||
1509 | #ifdef GNUNET_EXTRA_LOGGING | ||
1510 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d:%d with %d tasks and %d subordinates\n", | ||
1511 | session->local_peer_idx, | ||
1512 | step->debug_name, | ||
1513 | step->start_round, step->num_rounds, step->tasks_len, step->subordinates_len); | ||
1514 | #endif | ||
1515 | |||
1516 | step->is_running = GNUNET_YES; | ||
1517 | for (i = 0; i < step->tasks_len; i++) | ||
1518 | run_task (session, step->tasks[i]); | ||
1519 | |||
1520 | /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */ | ||
1521 | if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished)) | ||
1522 | finish_step (step); | ||
1523 | |||
1524 | /* Running the next ready steps will be triggered by task completion */ | ||
1525 | return; | ||
1526 | } | ||
1527 | step = step->next; | ||
1528 | } | ||
1529 | |||
1530 | return; | ||
1531 | } | ||
1532 | |||
1533 | |||
1534 | |||
1535 | static void | ||
1536 | finish_task (struct TaskEntry *task) | ||
1537 | { | ||
1538 | GNUNET_assert (GNUNET_NO == task->is_finished); | ||
1539 | task->is_finished = GNUNET_YES; | ||
1540 | |||
1541 | task->step->finished_tasks++; | ||
1542 | |||
1543 | if (task->step->finished_tasks == task->step->tasks_len) | ||
1544 | finish_step (task->step); | ||
874 | } | 1545 | } |
875 | 1546 | ||
876 | 1547 | ||
@@ -886,7 +1557,7 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess | |||
886 | { | 1557 | { |
887 | int i; | 1558 | int i; |
888 | for (i = 0; i < session->num_peers; i++) | 1559 | for (i = 0; i < session->num_peers; i++) |
889 | if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) | 1560 | if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity))) |
890 | return i; | 1561 | return i; |
891 | return -1; | 1562 | return -1; |
892 | } | 1563 | } |
@@ -899,27 +1570,24 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess | |||
899 | * exactly the same peers, the global id will be different. | 1570 | * exactly the same peers, the global id will be different. |
900 | * | 1571 | * |
901 | * @param session session to generate the global id for | 1572 | * @param session session to generate the global id for |
902 | * @param session_id local id of the consensus session | 1573 | * @param local_session_id local id of the consensus session |
903 | */ | 1574 | */ |
904 | static void | 1575 | static void |
905 | compute_global_id (struct ConsensusSession *session, | 1576 | compute_global_id (struct ConsensusSession *session, |
906 | const struct GNUNET_HashCode *session_id) | 1577 | const struct GNUNET_HashCode *local_session_id) |
907 | { | 1578 | { |
908 | int i; | 1579 | const char *salt = "gnunet-service-consensus/session_id"; |
909 | struct GNUNET_HashCode tmp; | 1580 | |
910 | struct GNUNET_HashCode phash; | 1581 | GNUNET_assert (GNUNET_YES == |
911 | 1582 | GNUNET_CRYPTO_kdf (&session->global_id, | |
912 | /* FIXME: use kdf? */ | 1583 | sizeof (struct GNUNET_HashCode), |
913 | 1584 | salt, | |
914 | session->global_id = *session_id; | 1585 | strlen (salt), |
915 | for (i = 0; i < session->num_peers; ++i) | 1586 | session->peers, |
916 | { | 1587 | session->num_peers * sizeof (struct GNUNET_PeerIdentity), |
917 | GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash); | 1588 | local_session_id, |
918 | GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp); | 1589 | sizeof (struct GNUNET_HashCode), |
919 | session->global_id = tmp; | 1590 | NULL)); |
920 | GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp); | ||
921 | session->global_id = tmp; | ||
922 | } | ||
923 | } | 1591 | } |
924 | 1592 | ||
925 | 1593 | ||
@@ -948,7 +1616,6 @@ initialize_session_peer_list (struct ConsensusSession *session, | |||
948 | unsigned int local_peer_in_list; | 1616 | unsigned int local_peer_in_list; |
949 | uint32_t listed_peers; | 1617 | uint32_t listed_peers; |
950 | const struct GNUNET_PeerIdentity *msg_peers; | 1618 | const struct GNUNET_PeerIdentity *msg_peers; |
951 | struct GNUNET_PeerIdentity *peers; | ||
952 | unsigned int i; | 1619 | unsigned int i; |
953 | 1620 | ||
954 | GNUNET_assert (NULL != join_msg); | 1621 | GNUNET_assert (NULL != join_msg); |
@@ -973,25 +1640,27 @@ initialize_session_peer_list (struct ConsensusSession *session, | |||
973 | if (GNUNET_NO == local_peer_in_list) | 1640 | if (GNUNET_NO == local_peer_in_list) |
974 | session->num_peers++; | 1641 | session->num_peers++; |
975 | 1642 | ||
976 | peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 1643 | session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); |
977 | 1644 | ||
978 | if (GNUNET_NO == local_peer_in_list) | 1645 | if (GNUNET_NO == local_peer_in_list) |
979 | peers[session->num_peers - 1] = my_peer; | 1646 | session->peers[session->num_peers - 1] = my_peer; |
980 | 1647 | ||
981 | memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); | 1648 | memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); |
982 | qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp); | 1649 | qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp); |
1650 | } | ||
983 | 1651 | ||
984 | session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); | ||
985 | 1652 | ||
986 | for (i = 0; i < session->num_peers; ++i) | 1653 | static struct TaskEntry * |
987 | { | 1654 | lookup_task (struct ConsensusSession *session, |
988 | /* initialize back-references, so consensus peer information can | 1655 | struct TaskKey *key) |
989 | * be used as closure */ | 1656 | { |
990 | session->info[i].session = session; | 1657 | struct GNUNET_HashCode hash; |
991 | session->info[i].peer_id = peers[i]; | 1658 | |
992 | } | ||
993 | 1659 | ||
994 | GNUNET_free (peers); | 1660 | GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash); |
1661 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n", | ||
1662 | GNUNET_h2s (&hash)); | ||
1663 | return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash); | ||
995 | } | 1664 | } |
996 | 1665 | ||
997 | 1666 | ||
@@ -1017,12 +1686,10 @@ set_listen_cb (void *cls, | |||
1017 | struct GNUNET_SET_Request *request) | 1686 | struct GNUNET_SET_Request *request) |
1018 | { | 1687 | { |
1019 | struct ConsensusSession *session = cls; | 1688 | struct ConsensusSession *session = cls; |
1020 | struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; | 1689 | struct TaskKey tk; |
1021 | struct ConsensusPeerInformation *cpi; | 1690 | struct TaskEntry *task; |
1022 | struct GNUNET_SET_OperationHandle *set_op; | 1691 | struct GNUNET_CONSENSUS_RoundContextMessage *cm; |
1023 | struct RoundInfo round_info; | 1692 | GNUNET_SET_ResultIterator my_result_cb; |
1024 | int index; | ||
1025 | int cmp; | ||
1026 | 1693 | ||
1027 | if (NULL == context_msg) | 1694 | if (NULL == context_msg) |
1028 | { | 1695 | { |
@@ -1030,85 +1697,524 @@ set_listen_cb (void *cls, | |||
1030 | return; | 1697 | return; |
1031 | } | 1698 | } |
1032 | 1699 | ||
1033 | index = get_peer_idx (other_peer, session); | 1700 | if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type)) |
1701 | { | ||
1702 | GNUNET_break_op (0); | ||
1703 | return; | ||
1704 | } | ||
1034 | 1705 | ||
1035 | if (index < 0) | 1706 | if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size)) |
1036 | { | 1707 | { |
1037 | GNUNET_break_op (0); | 1708 | GNUNET_break_op (0); |
1038 | return; | 1709 | return; |
1039 | } | 1710 | } |
1040 | 1711 | ||
1041 | round_info.round = ntohl (msg->round); | 1712 | cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; |
1042 | round_info.exp_repetition = ntohl (msg->exp_repetition); | 1713 | |
1043 | round_info.exp_subround = ntohl (msg->exp_subround); | 1714 | tk = ((struct TaskKey) { |
1715 | .kind = ntohs (cm->kind), | ||
1716 | .peer1 = ntohs (cm->peer1), | ||
1717 | .peer2 = ntohs (cm->peer2), | ||
1718 | .repetition = ntohs (cm->repetition), | ||
1719 | .leader = ntohs (cm->leader), | ||
1720 | }); | ||
1044 | 1721 | ||
1045 | cpi = &session->info[index]; | 1722 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n", |
1723 | session->local_peer_idx, debug_str_task_key (&tk)); | ||
1046 | 1724 | ||
1047 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index); | 1725 | task = lookup_task (session, &tk); |
1048 | 1726 | ||
1049 | switch (session->current_round) | 1727 | if (NULL == task) |
1050 | { | 1728 | { |
1051 | case CONSENSUS_ROUND_BEGIN: | 1729 | GNUNET_break_op (0); |
1052 | /* we're in the begin round, so requests for the exchange round may | 1730 | return; |
1053 | * come in, they will be delayed for now! */ | 1731 | } |
1054 | case CONSENSUS_ROUND_EXCHANGE: | 1732 | |
1055 | cmp = rounds_compare (session, &round_info); | 1733 | if (ACTION_RECONCILE != task->action) |
1056 | if (cmp > 0) | 1734 | { |
1057 | { | 1735 | GNUNET_break_op (0); |
1058 | /* the other peer is too late */ | 1736 | return; |
1059 | LOG_PP (GNUNET_ERROR_TYPE_DEBUG, cpi, "too late for the current round\n"); | 1737 | } |
1060 | return; | 1738 | |
1061 | } | 1739 | if (GNUNET_YES == task->is_finished) |
1062 | /* kill old request, if any. this is legal, | 1740 | { |
1063 | * as the other peer would not make a new request if it would want to | 1741 | GNUNET_break_op (0); |
1064 | * complete the old one! */ | 1742 | return; |
1065 | if (NULL != cpi->set_op) | 1743 | } |
1066 | { | 1744 | |
1067 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got new request from same peer, canceling old one\n"); | 1745 | if (task->key.peer2 != session->local_peer_idx) |
1068 | GNUNET_SET_operation_cancel (cpi->set_op); | 1746 | { |
1069 | cpi->set_op = NULL; | 1747 | /* We're being asked, so we must be thne 2nd peer. */ |
1070 | } | 1748 | GNUNET_break_op (0); |
1071 | set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, | 1749 | return; |
1072 | set_result_cb, &session->info[index]); | 1750 | } |
1073 | if (cmp == 0) | 1751 | |
1074 | { | 1752 | if (task->key.peer1 == task->key.peer2) |
1075 | /* we're in exactly the right round for the incoming request */ | 1753 | my_result_cb = set_result_cb_loop; |
1076 | if (cpi != cpi->session->partner_incoming) | 1754 | else |
1077 | { | 1755 | my_result_cb = set_result_cb; |
1078 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%u: got request from %u (with matching round), " | 1756 | |
1079 | "but incoming partner is %d\n", cpi->session->local_peer_idx, cpi - cpi->session->info, | 1757 | task->op = GNUNET_SET_accept (request, |
1080 | ((NULL == cpi->session->partner_incoming) ? -1 : (cpi->session->partner_incoming - cpi->session->info))); | 1758 | GNUNET_SET_RESULT_ADDED, /* XXX: obsolete soon */ |
1081 | GNUNET_SET_operation_cancel (set_op); | 1759 | my_result_cb, |
1082 | return; | 1760 | task); |
1083 | } | 1761 | |
1084 | cpi->set_op = set_op; | 1762 | /* If the task hasn't been started yet, |
1085 | if (GNUNET_OK != GNUNET_SET_commit (set_op, session->element_set)) | 1763 | we wait for that until we commit. */ |
1086 | { | 1764 | |
1087 | GNUNET_break (0); | 1765 | if (GNUNET_YES == task->is_running) |
1088 | } | 1766 | { |
1089 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index); | 1767 | commit_set (session, task); |
1090 | } | 1768 | } |
1091 | else | 1769 | } |
1092 | { | 1770 | |
1093 | /* we still have wait until we have finished the current round, | 1771 | |
1094 | * as the other peer's round is larger */ | 1772 | |
1095 | cpi->delayed_set_op = set_op; | 1773 | static void |
1096 | cpi->delayed_round_info = round_info; | 1774 | put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap, |
1097 | /* The current setop is finished, as we canceled the current setop above. */ | 1775 | struct TaskEntry *t) |
1098 | cpi->set_op_finished = GNUNET_YES; | 1776 | { |
1099 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index); | 1777 | struct GNUNET_HashCode round_hash; |
1100 | } | 1778 | struct Step *s; |
1101 | break; | 1779 | |
1102 | default: | 1780 | GNUNET_assert (NULL != t->step); |
1103 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n", | 1781 | |
1104 | session->local_peer_idx, session->current_round, index); | 1782 | t = GNUNET_memdup (t, sizeof (struct TaskEntry)); |
1105 | GNUNET_break_op (0); | 1783 | |
1106 | return; | 1784 | s = t->step; |
1785 | |||
1786 | if (s->tasks_len == s->tasks_cap) | ||
1787 | { | ||
1788 | unsigned int target_size = 3 * (s->tasks_cap + 1) / 2; | ||
1789 | GNUNET_array_grow (s->tasks, | ||
1790 | s->tasks_cap, | ||
1791 | target_size); | ||
1792 | } | ||
1793 | |||
1794 | #ifdef GNUNET_EXTRA_LOGGING | ||
1795 | GNUNET_assert (NULL != s->debug_name); | ||
1796 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n", | ||
1797 | debug_str_task_key (&t->key), | ||
1798 | s->debug_name); | ||
1799 | #endif | ||
1800 | |||
1801 | s->tasks[s->tasks_len] = t; | ||
1802 | s->tasks_len++; | ||
1803 | |||
1804 | GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash); | ||
1805 | GNUNET_assert (GNUNET_OK == | ||
1806 | GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t, | ||
1807 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
1808 | } | ||
1809 | |||
1810 | |||
1811 | static void | ||
1812 | install_step_timeouts (struct ConsensusSession *session) | ||
1813 | { | ||
1814 | /* Given the fully constructed task graph | ||
1815 | with rounds for tasks, we can give the tasks timeouts. */ | ||
1816 | |||
1817 | /* XXX: implement! */ | ||
1818 | } | ||
1819 | |||
1820 | |||
1821 | |||
1822 | /* | ||
1823 | * Arrange two peers in some canonical order. | ||
1824 | */ | ||
1825 | static void | ||
1826 | arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n) | ||
1827 | { | ||
1828 | uint16_t a; | ||
1829 | uint16_t b; | ||
1830 | |||
1831 | GNUNET_assert (*p1 < n); | ||
1832 | GNUNET_assert (*p2 < n); | ||
1833 | |||
1834 | if (*p1 < *p2) | ||
1835 | { | ||
1836 | a = *p1; | ||
1837 | b = *p2; | ||
1838 | } | ||
1839 | else | ||
1840 | { | ||
1841 | a = *p2; | ||
1842 | b = *p1; | ||
1843 | } | ||
1844 | |||
1845 | /* For uniformly random *p1, *p2, | ||
1846 | this condition is true with 50% chance */ | ||
1847 | if (((b - a) + n) % n <= n / 2) | ||
1848 | { | ||
1849 | *p1 = a; | ||
1850 | *p2 = b; | ||
1851 | } | ||
1852 | else | ||
1853 | { | ||
1854 | *p1 = b; | ||
1855 | *p2 = a; | ||
1107 | } | 1856 | } |
1108 | } | 1857 | } |
1109 | 1858 | ||
1110 | 1859 | ||
1111 | /** | 1860 | /** |
1861 | * Record @a dep as a dependency of @step. | ||
1862 | */ | ||
1863 | static void | ||
1864 | step_depend_on (struct Step *step, struct Step *dep) | ||
1865 | { | ||
1866 | /* We're not checking for cyclic dependencies, | ||
1867 | but this is a cheap sanity check. */ | ||
1868 | GNUNET_assert (step != dep); | ||
1869 | GNUNET_assert (NULL != step); | ||
1870 | GNUNET_assert (NULL != dep); | ||
1871 | // XXX: make rounds work | ||
1872 | //GNUNET_assert (dep->start_round <= step->start_round); | ||
1873 | |||
1874 | #ifdef GNUNET_EXTRA_LOGGING | ||
1875 | /* Make sure we have complete debugging information. | ||
1876 | Also checks that we don't screw up too badly | ||
1877 | constructing the task graph. */ | ||
1878 | GNUNET_assert (NULL != step->debug_name); | ||
1879 | GNUNET_assert (NULL != dep->debug_name); | ||
1880 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1881 | "Making step `%s' depend on `%s'\n", | ||
1882 | step->debug_name, | ||
1883 | dep->debug_name); | ||
1884 | #endif | ||
1885 | |||
1886 | if (dep->subordinates_cap == dep->subordinates_len) | ||
1887 | { | ||
1888 | unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2; | ||
1889 | GNUNET_array_grow (dep->subordinates, | ||
1890 | dep->subordinates_cap, | ||
1891 | target_size); | ||
1892 | } | ||
1893 | |||
1894 | GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap); | ||
1895 | |||
1896 | dep->subordinates[dep->subordinates_len] = step; | ||
1897 | dep->subordinates_len++; | ||
1898 | |||
1899 | step->pending_prereq++; | ||
1900 | } | ||
1901 | |||
1902 | |||
1903 | static struct Step * | ||
1904 | create_step (struct ConsensusSession *session, int start_round, int num_rounds) | ||
1905 | { | ||
1906 | struct Step *step; | ||
1907 | step = GNUNET_new (struct Step); | ||
1908 | step->session = session; | ||
1909 | step->start_round = start_round; | ||
1910 | step->num_rounds = num_rounds; | ||
1911 | GNUNET_CONTAINER_DLL_insert_tail (session->steps_head, | ||
1912 | session->steps_tail, | ||
1913 | step); | ||
1914 | return step; | ||
1915 | } | ||
1916 | |||
1917 | |||
1918 | /** | ||
1919 | * Construct the task graph for a single | ||
1920 | * gradecast. | ||
1921 | */ | ||
1922 | static void | ||
1923 | construct_task_graph_gradecast (struct ConsensusSession *session, | ||
1924 | uint16_t rep, | ||
1925 | uint16_t lead, | ||
1926 | struct Step *step_before, | ||
1927 | struct Step *step_after) | ||
1928 | { | ||
1929 | uint16_t n = session->num_peers; | ||
1930 | uint16_t t = n / 3; | ||
1931 | |||
1932 | uint16_t me = session->local_peer_idx; | ||
1933 | |||
1934 | uint16_t p1; | ||
1935 | uint16_t p2; | ||
1936 | |||
1937 | /* The task we're currently setting up. */ | ||
1938 | struct TaskEntry task; | ||
1939 | |||
1940 | struct Step *step; | ||
1941 | struct Step *prev_step; | ||
1942 | |||
1943 | uint16_t round; | ||
1944 | |||
1945 | unsigned int k; | ||
1946 | |||
1947 | round = step_before->start_round + step_before->num_rounds; | ||
1948 | |||
1949 | /* gcast step 1: leader disseminates */ | ||
1950 | |||
1951 | step = create_step (session, round, 1); | ||
1952 | |||
1953 | #ifdef GNUNET_EXTRA_LOGGING | ||
1954 | GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep); | ||
1955 | #endif | ||
1956 | step_depend_on (step, step_before); | ||
1957 | |||
1958 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: Considering leader %d\n", session->local_peer_idx, lead); | ||
1959 | |||
1960 | if (lead == me) | ||
1961 | { | ||
1962 | for (k = 0; k < n; k++) | ||
1963 | { | ||
1964 | if (k == me) | ||
1965 | continue; | ||
1966 | p1 = me; | ||
1967 | p2 = k; | ||
1968 | arrange_peers (&p1, &p2, n); | ||
1969 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead); | ||
1970 | task = ((struct TaskEntry) { | ||
1971 | .step = step, | ||
1972 | .action = ACTION_RECONCILE, | ||
1973 | .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me }, | ||
1974 | .input_set = (struct SetKey) { SET_KIND_CURRENT, rep }, | ||
1975 | .output_set = (struct SetKey) { SET_KIND_NONE }, | ||
1976 | }); | ||
1977 | put_task (session->taskmap, &task); | ||
1978 | } | ||
1979 | /* We run this task to make sure that the leader | ||
1980 | has the stored the SET_KIND_LEADER set of himself, | ||
1981 | so he can participate in the rest of the gradecast | ||
1982 | without the code having to handle any special cases. */ | ||
1983 | task = ((struct TaskEntry) { | ||
1984 | .step = step, | ||
1985 | .action = ACTION_RECONCILE, | ||
1986 | .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me }, | ||
1987 | .input_set = (struct SetKey) { SET_KIND_CURRENT, rep }, | ||
1988 | .output_set = (struct SetKey) { SET_KIND_LEADER, rep, me }, | ||
1989 | .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, me }, | ||
1990 | }); | ||
1991 | put_task (session->taskmap, &task); | ||
1992 | } | ||
1993 | else | ||
1994 | { | ||
1995 | p1 = me; | ||
1996 | p2 = lead; | ||
1997 | arrange_peers (&p1, &p2, n); | ||
1998 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead); | ||
1999 | task = ((struct TaskEntry) { | ||
2000 | .step = step, | ||
2001 | .action = ACTION_RECONCILE, | ||
2002 | .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead}, | ||
2003 | .input_set = (struct SetKey) { SET_KIND_CURRENT, rep }, | ||
2004 | .output_set = (struct SetKey) { SET_KIND_LEADER, rep, lead }, | ||
2005 | .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, lead }, | ||
2006 | }); | ||
2007 | put_task (session->taskmap, &task); | ||
2008 | } | ||
2009 | |||
2010 | /* gcast phase 2: echo */ | ||
2011 | prev_step = step; | ||
2012 | step = create_step (session, round, 1); | ||
2013 | #ifdef GNUNET_EXTRA_LOGGING | ||
2014 | GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep); | ||
2015 | #endif | ||
2016 | step_depend_on (step, prev_step); | ||
2017 | |||
2018 | for (k = 0; k < n; k++) | ||
2019 | { | ||
2020 | p1 = k; | ||
2021 | p2 = me; | ||
2022 | arrange_peers (&p1, &p2, n); | ||
2023 | task = ((struct TaskEntry) { | ||
2024 | .step = step, | ||
2025 | .action = ACTION_RECONCILE, | ||
2026 | .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead }, | ||
2027 | .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead }, | ||
2028 | .output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }, | ||
2029 | }); | ||
2030 | put_task (session->taskmap, &task); | ||
2031 | } | ||
2032 | |||
2033 | prev_step = step; | ||
2034 | step = create_step (session, round, 1); | ||
2035 | #ifdef GNUNET_EXTRA_LOGGING | ||
2036 | GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep); | ||
2037 | #endif | ||
2038 | step_depend_on (step, prev_step); | ||
2039 | |||
2040 | arrange_peers (&p1, &p2, n); | ||
2041 | task = ((struct TaskEntry) { | ||
2042 | .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead }, | ||
2043 | .step = step, | ||
2044 | .action = ACTION_EVAL_RFN, | ||
2045 | .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead }, | ||
2046 | .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }, | ||
2047 | .output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead }, | ||
2048 | .threshold = n - t, | ||
2049 | }); | ||
2050 | put_task (session->taskmap, &task); | ||
2051 | |||
2052 | prev_step = step; | ||
2053 | step = create_step (session, round, 1); | ||
2054 | #ifdef GNUNET_EXTRA_LOGGING | ||
2055 | GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep); | ||
2056 | #endif | ||
2057 | step_depend_on (step, prev_step); | ||
2058 | |||
2059 | /* gcast phase 3: confirmation and grading */ | ||
2060 | for (k = 0; k < n; k++) | ||
2061 | { | ||
2062 | p1 = k; | ||
2063 | p2 = me; | ||
2064 | arrange_peers (&p1, &p2, n); | ||
2065 | task = ((struct TaskEntry) { | ||
2066 | .step = step, | ||
2067 | .action = ACTION_RECONCILE, | ||
2068 | .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead}, | ||
2069 | .input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead }, | ||
2070 | .output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead }, | ||
2071 | }); | ||
2072 | put_task (session->taskmap, &task); | ||
2073 | } | ||
2074 | |||
2075 | prev_step = step; | ||
2076 | step = create_step (session, round, 1); | ||
2077 | #ifdef GNUNET_EXTRA_LOGGING | ||
2078 | GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep); | ||
2079 | #endif | ||
2080 | step_depend_on (step, prev_step); | ||
2081 | |||
2082 | // evaluate ConfirmationReferendum and | ||
2083 | // apply it to the LeaderReferendum | ||
2084 | task = ((struct TaskEntry) { | ||
2085 | .step = step, | ||
2086 | .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead }, | ||
2087 | .action = ACTION_EVAL_RFN, | ||
2088 | .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }, | ||
2089 | .output_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, rep }, | ||
2090 | }); | ||
2091 | put_task (session->taskmap, &task); | ||
2092 | |||
2093 | step_depend_on (step_after, step); | ||
2094 | } | ||
2095 | |||
2096 | |||
2097 | static void | ||
2098 | construct_task_graph (struct ConsensusSession *session) | ||
2099 | { | ||
2100 | uint16_t n = session->num_peers; | ||
2101 | uint16_t t = n / 3; | ||
2102 | |||
2103 | uint16_t me = session->local_peer_idx; | ||
2104 | |||
2105 | uint16_t p1; | ||
2106 | uint16_t p2; | ||
2107 | |||
2108 | /* The task we're currently setting up. */ | ||
2109 | struct TaskEntry task; | ||
2110 | |||
2111 | /* Current leader */ | ||
2112 | unsigned int lead; | ||
2113 | |||
2114 | struct Step *step; | ||
2115 | struct Step *prev_step; | ||
2116 | |||
2117 | unsigned int round = 0; | ||
2118 | |||
2119 | unsigned int i; | ||
2120 | |||
2121 | // XXX: introduce first step, | ||
2122 | // where we wait for all insert acks | ||
2123 | // from the set service | ||
2124 | |||
2125 | /* faster but brittle all-to-all */ | ||
2126 | |||
2127 | // XXX: Not implemented yet | ||
2128 | |||
2129 | /* all-to-all step */ | ||
2130 | |||
2131 | step = create_step (session, round, 1); | ||
2132 | |||
2133 | #ifdef GNUNET_EXTRA_LOGGING | ||
2134 | step->debug_name = GNUNET_strdup ("all to all"); | ||
2135 | #endif | ||
2136 | |||
2137 | for (i = 0; i < n; i++) | ||
2138 | { | ||
2139 | p1 = me; | ||
2140 | p2 = i; | ||
2141 | arrange_peers (&p1, &p2, n); | ||
2142 | task = ((struct TaskEntry) { | ||
2143 | .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 }, | ||
2144 | .step = step, | ||
2145 | .action = ACTION_RECONCILE, | ||
2146 | .input_set = (struct SetKey) { SET_KIND_CURRENT, 0 }, | ||
2147 | .output_set = (struct SetKey) { SET_KIND_CURRENT, 0 }, | ||
2148 | }); | ||
2149 | put_task (session->taskmap, &task); | ||
2150 | } | ||
2151 | |||
2152 | round++; | ||
2153 | |||
2154 | prev_step = step; | ||
2155 | step = NULL; | ||
2156 | |||
2157 | /* Byzantine union */ | ||
2158 | |||
2159 | /* sequential repetitions of the gradecasts */ | ||
2160 | for (i = 0; i < t + 1; i++) | ||
2161 | { | ||
2162 | struct Step *step_rep_start; | ||
2163 | struct Step *step_rep_end; | ||
2164 | |||
2165 | step_rep_start = create_step (session, round, 1); | ||
2166 | #ifdef GNUNET_EXTRA_LOGGING | ||
2167 | GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i); | ||
2168 | #endif | ||
2169 | |||
2170 | step_depend_on (step_rep_start, prev_step); | ||
2171 | |||
2172 | step_rep_end = create_step (session, round, 1); | ||
2173 | #ifdef GNUNET_EXTRA_LOGGING | ||
2174 | GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i); | ||
2175 | #endif | ||
2176 | |||
2177 | /* parallel gradecasts */ | ||
2178 | for (lead = 0; lead < n; lead++) | ||
2179 | construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end); | ||
2180 | |||
2181 | // TODO: add peers to ignore list, | ||
2182 | // | ||
2183 | // evaluate ConfirmationReferendum and | ||
2184 | // apply it to the LeaderReferendum | ||
2185 | task = ((struct TaskEntry) { | ||
2186 | .step = step_rep_end, | ||
2187 | .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, i, -1}, | ||
2188 | .action = ACTION_APPLY_DIFF, | ||
2189 | .input_set = (struct SetKey) { SET_KIND_CURRENT, i }, | ||
2190 | .input_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, i }, | ||
2191 | .output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 }, | ||
2192 | }); | ||
2193 | put_task (session->taskmap, &task); | ||
2194 | |||
2195 | prev_step = step_rep_end; | ||
2196 | } | ||
2197 | |||
2198 | /* There is no next gradecast round, thus the final | ||
2199 | start step is the overall end step of the gradecasts */ | ||
2200 | step = create_step (session, round, 1); | ||
2201 | #ifdef GNUNET_EXTRA_LOGGING | ||
2202 | GNUNET_asprintf (&step->debug_name, "finish"); | ||
2203 | #endif | ||
2204 | step_depend_on (step, prev_step); | ||
2205 | |||
2206 | task = ((struct TaskEntry) { | ||
2207 | .step = step, | ||
2208 | .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 }, | ||
2209 | .input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 }, | ||
2210 | .action = ACTION_FINISH, | ||
2211 | }); | ||
2212 | |||
2213 | put_task (session->taskmap, &task); | ||
2214 | } | ||
2215 | |||
2216 | |||
2217 | /** | ||
1112 | * Initialize the session, continue receiving messages from the owning client | 2218 | * Initialize the session, continue receiving messages from the owning client |
1113 | * | 2219 | * |
1114 | * @param session the session to initialize | 2220 | * @param session the session to initialize |
@@ -1124,21 +2230,21 @@ initialize_session (struct ConsensusSession *session, | |||
1124 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers); | 2230 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers); |
1125 | compute_global_id (session, &join_msg->session_id); | 2231 | compute_global_id (session, &join_msg->session_id); |
1126 | 2232 | ||
1127 | /* check if some local client already owns the session. | 2233 | /* Check if some local client already owns the session. |
1128 | * it is only legal to have a session with an existing global id | 2234 | It is only legal to have a session with an existing global id |
1129 | * if all other sessions with this global id are finished.*/ | 2235 | if all other sessions with this global id are finished.*/ |
1130 | other_session = sessions_head; | 2236 | other_session = sessions_head; |
1131 | while (NULL != other_session) | 2237 | while (NULL != other_session) |
1132 | { | 2238 | { |
1133 | if ((other_session != session) && | 2239 | if ((other_session != session) && |
1134 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) | 2240 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) |
1135 | { | 2241 | { |
1136 | if (CONSENSUS_ROUND_FINISH != other_session->current_round) | 2242 | //if (CONSENSUS_ROUND_FINISH != other_session->current_round) |
1137 | { | 2243 | //{ |
1138 | GNUNET_break (0); | 2244 | // GNUNET_break (0); |
1139 | destroy_session (session); | 2245 | // destroy_session (session); |
1140 | return; | 2246 | // return; |
1141 | } | 2247 | //} |
1142 | break; | 2248 | break; |
1143 | } | 2249 | } |
1144 | other_session = other_session->next; | 2250 | other_session = other_session->next; |
@@ -1152,12 +2258,30 @@ initialize_session (struct ConsensusSession *session, | |||
1152 | 2258 | ||
1153 | session->local_peer_idx = get_peer_idx (&my_peer, session); | 2259 | session->local_peer_idx = get_peer_idx (&my_peer, session); |
1154 | GNUNET_assert (-1 != session->local_peer_idx); | 2260 | GNUNET_assert (-1 != session->local_peer_idx); |
1155 | session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
1156 | GNUNET_assert (NULL != session->element_set); | ||
1157 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, | 2261 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, |
1158 | &session->global_id, | 2262 | &session->global_id, |
1159 | set_listen_cb, session); | 2263 | set_listen_cb, session); |
1160 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx); | 2264 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx); |
2265 | |||
2266 | session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
2267 | session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
2268 | session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
2269 | session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
2270 | |||
2271 | { | ||
2272 | struct SetEntry *client_set; | ||
2273 | client_set = GNUNET_new (struct SetEntry); | ||
2274 | client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
2275 | client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 }); | ||
2276 | put_set (session, client_set); | ||
2277 | } | ||
2278 | |||
2279 | session->peers_ignored = GNUNET_new_array (session->num_peers, int); | ||
2280 | |||
2281 | /* Just construct the task graph, | ||
2282 | but don't run anything until the client calls conclude. */ | ||
2283 | construct_task_graph (session); | ||
2284 | |||
1161 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id)); | 2285 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id)); |
1162 | } | 2286 | } |
1163 | 2287 | ||
@@ -1212,6 +2336,13 @@ client_join (void *cls, | |||
1212 | } | 2336 | } |
1213 | 2337 | ||
1214 | 2338 | ||
2339 | static void | ||
2340 | client_insert_done (void *cls) | ||
2341 | { | ||
2342 | // FIXME: implement | ||
2343 | } | ||
2344 | |||
2345 | |||
1215 | /** | 2346 | /** |
1216 | * Called when a client performs an insert operation. | 2347 | * Called when a client performs an insert operation. |
1217 | * | 2348 | * |
@@ -1228,6 +2359,7 @@ client_insert (void *cls, | |||
1228 | struct GNUNET_CONSENSUS_ElementMessage *msg; | 2359 | struct GNUNET_CONSENSUS_ElementMessage *msg; |
1229 | struct GNUNET_SET_Element *element; | 2360 | struct GNUNET_SET_Element *element; |
1230 | ssize_t element_size; | 2361 | ssize_t element_size; |
2362 | struct GNUNET_SET_Handle *initial_set; | ||
1231 | 2363 | ||
1232 | session = get_session_by_client (client); | 2364 | session = get_session_by_client (client); |
1233 | 2365 | ||
@@ -1238,7 +2370,7 @@ client_insert (void *cls, | |||
1238 | return; | 2370 | return; |
1239 | } | 2371 | } |
1240 | 2372 | ||
1241 | if (CONSENSUS_ROUND_BEGIN != session->current_round) | 2373 | if (GNUNET_YES == session->conclude_started) |
1242 | { | 2374 | { |
1243 | GNUNET_break (0); | 2375 | GNUNET_break (0); |
1244 | GNUNET_SERVER_client_disconnect (client); | 2376 | GNUNET_SERVER_client_disconnect (client); |
@@ -1258,11 +2390,19 @@ client_insert (void *cls, | |||
1258 | element->size = element_size; | 2390 | element->size = element_size; |
1259 | memcpy (&element[1], &msg[1], element_size); | 2391 | memcpy (&element[1], &msg[1], element_size); |
1260 | element->data = &element[1]; | 2392 | element->data = &element[1]; |
1261 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); | 2393 | { |
2394 | struct SetKey key = { SET_KIND_CURRENT, 0, 0 }; | ||
2395 | struct SetEntry *entry; | ||
2396 | entry = lookup_set (session, &key); | ||
2397 | GNUNET_assert (NULL != entry); | ||
2398 | initial_set = entry->h; | ||
2399 | } | ||
2400 | session->num_client_insert_pending++; | ||
2401 | GNUNET_SET_add_element (initial_set, element, client_insert_done, session); | ||
1262 | GNUNET_free (element); | 2402 | GNUNET_free (element); |
1263 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2403 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1264 | 2404 | ||
1265 | // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx); | 2405 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx); |
1266 | } | 2406 | } |
1267 | 2407 | ||
1268 | 2408 | ||
@@ -1280,7 +2420,6 @@ client_conclude (void *cls, | |||
1280 | { | 2420 | { |
1281 | struct ConsensusSession *session; | 2421 | struct ConsensusSession *session; |
1282 | 2422 | ||
1283 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n"); | ||
1284 | session = get_session_by_client (client); | 2423 | session = get_session_by_client (client); |
1285 | if (NULL == session) | 2424 | if (NULL == session) |
1286 | { | 2425 | { |
@@ -1289,24 +2428,24 @@ client_conclude (void *cls, | |||
1289 | GNUNET_SERVER_client_disconnect (client); | 2428 | GNUNET_SERVER_client_disconnect (client); |
1290 | return; | 2429 | return; |
1291 | } | 2430 | } |
1292 | if (CONSENSUS_ROUND_BEGIN != session->current_round) | 2431 | |
2432 | if (GNUNET_YES == session->conclude_started) | ||
1293 | { | 2433 | { |
1294 | /* client requested conclude twice */ | 2434 | /* conclude started twice */ |
1295 | GNUNET_break (0); | 2435 | GNUNET_break (0); |
2436 | GNUNET_SERVER_client_disconnect (client); | ||
2437 | destroy_session (session); | ||
1296 | return; | 2438 | return; |
1297 | } | 2439 | } |
1298 | if (session->num_peers <= 1) | ||
1299 | { | ||
1300 | session->current_round = CONSENSUS_ROUND_FINISH; | ||
1301 | GNUNET_SET_iterate (session->element_set, send_to_client_iter, session); | ||
1302 | } | ||
1303 | else | ||
1304 | { | ||
1305 | /* the 'begin' round is over, start with the next, actual round */ | ||
1306 | round_over (session, NULL); | ||
1307 | } | ||
1308 | 2440 | ||
1309 | GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round); | 2441 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n"); |
2442 | |||
2443 | session->conclude_started = GNUNET_YES; | ||
2444 | |||
2445 | install_step_timeouts (session); | ||
2446 | run_ready_steps (session); | ||
2447 | |||
2448 | |||
1310 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2449 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1311 | } | 2450 | } |
1312 | 2451 | ||
@@ -1343,17 +2482,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
1343 | session = get_session_by_client (client); | 2482 | session = get_session_by_client (client); |
1344 | if (NULL == session) | 2483 | if (NULL == session) |
1345 | return; | 2484 | return; |
1346 | if ((CONSENSUS_ROUND_BEGIN == session->current_round) || | 2485 | // FIXME: destroy if we can |
1347 | (CONSENSUS_ROUND_FINISH == session->current_round)) | ||
1348 | { | ||
1349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, destroying session\n"); | ||
1350 | destroy_session (session); | ||
1351 | return; | ||
1352 | } | ||
1353 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n"); | ||
1354 | } | 2486 | } |
1355 | 2487 | ||
1356 | 2488 | ||
2489 | |||
1357 | /** | 2490 | /** |
1358 | * Start processing consensus requests. | 2491 | * Start processing consensus requests. |
1359 | * | 2492 | * |