From a11e17158609766f29c40de6daecf5ce02b38e6c Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 5 Oct 2015 21:26:56 +0000 Subject: work on consensus and set - evil peers for consensus - various fixes for consensus and set --- src/consensus/gnunet-consensus-profiler.c | 2 +- src/consensus/gnunet-service-consensus.c | 986 +++++++++++++++++++++++------- src/consensus/test_consensus.conf | 12 +- src/include/gnunet_set_service.h | 20 + src/set/gnunet-service-set.c | 10 +- src/set/set_api.c | 36 ++ 6 files changed, 852 insertions(+), 214 deletions(-) diff --git a/src/consensus/gnunet-consensus-profiler.c b/src/consensus/gnunet-consensus-profiler.c index 11c54fa78..a075cc291 100644 --- a/src/consensus/gnunet-consensus-profiler.c +++ b/src/consensus/gnunet-consensus-profiler.c @@ -437,7 +437,7 @@ main (int argc, char **argv) gettext_noop ("number of peers in consensus"), GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers }, { 'k', "value-replication", NULL, - gettext_noop ("how many peers receive one value?"), + gettext_noop ("how many peers (random selection without replacement) receive one value?"), GNUNET_YES, &GNUNET_GETOPT_set_uint, &replication }, { 'x', "num-values", NULL, gettext_noop ("number of values"), diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 424be7a71..5a2f62cd5 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -128,40 +128,50 @@ enum PhaseKind PHASE_KIND_GRADECAST_CONFIRM, PHASE_KIND_GRADECAST_CONFIRM_GRADE, PHASE_KIND_GRADECAST_APPLY_RESULT, + /** + * Apply a repetition of the all-to-all + * gradecast to the current set. + */ + PHASE_KIND_APPLY_REP, PHASE_KIND_FINISH, }; -enum ActionType +enum TaskKind { /** * Do a set reconciliation with another peer (or via looback). */ - ACTION_RECONCILE, + TASK_RECONCILE, + /** + * Same as reconciliation, but only care about added elements. + */ + TASK_UNION, /** * Apply a referendum with a threshold * to a set and/or a diff. */ - ACTION_EVAL_RFN, + TASK_EVAL_RFN, /** * Apply a diff to a set. */ - ACTION_APPLY_DIFF, - ACTION_FINISH, + TASK_APPLY_DIFF, + FASK_FINISH, }; enum SetKind { SET_KIND_NONE = 0, SET_KIND_CURRENT, - SET_KIND_LEADER, + SET_KIND_LEADER_PROPOSAL, SET_KIND_ECHO_RESULT, }; enum DiffKind { DIFF_KIND_NONE = 0, - DIFF_KIND_LEADER, + DIFF_KIND_LEADER_PROPOSAL, + DIFF_KIND_LEADER_CONSENSUS, DIFF_KIND_GRADECAST_RESULT, }; @@ -170,9 +180,74 @@ enum RfnKind RFN_KIND_NONE = 0, RFN_KIND_ECHO, RFN_KIND_CONFIRM, + RFN_KIND_GRADECAST_RESULT }; +struct SetOpCls +{ + struct SetKey input_set; + + struct SetKey output_set; + struct RfnKey output_rfn; + struct DiffKey output_diff; + + int do_not_remove; + + struct GNUNET_SET_OperationHandle *op; +}; + +struct EvalRfnCls +{ + struct SetKey input_set; + struct RfnKey input_rfn; + + uint16_t threshold; + + struct SetKey output_set; + struct DiffKey output_diff; +}; + + +struct ApplyDiffCls +{ + struct SetKey input_set; + struct DiffKey input_diff; + struct SetKey output_set; +}; + + +struct LeaderApplyCls +{ + struct DiffKey input_diff_1; + struct DiffKey input_diff_2; + + struct RfnKey output_rfn; +}; + + +struct FinishCls +{ + struct SetKey input_set; +}; + +/** + * Closure for both @a start_task + * and @a cancel_task. + */ +union TaskFuncCls +{ + struct SetOpCls setop; + struct EvalRfnCls eval_rfn; + struct ApplyDiffCls apply_diff; + struct LeaderApplyCls leader_apply; + struct FinishCls finish; +}; + +struct TaskEntry; + +typedef void (*TaskFunc) (struct TaskEntry *task); + /* * Node in the consensus task graph. */ @@ -182,30 +257,16 @@ struct TaskEntry struct Step *step; - int is_running; + int is_started; int is_finished; - enum ActionType action; - - struct SetKey input_set; - struct DiffKey input_diff; - struct RfnKey input_rfn; - struct SetKey output_set; - struct DiffKey output_diff; - struct RfnKey output_rfn; - - /** - * Threshold when evaluating referendums. - */ - uint16_t threshold; + enum TaskKind kind; - /** - * Operation that is running for this task. - */ - struct GNUNET_SET_OperationHandle *op; + TaskFunc start; + TaskFunc cancel; - struct GNUNET_SET_Handle *commited_set; + union TaskFuncCls cls; }; @@ -280,17 +341,10 @@ struct Step char *debug_name; }; -struct RfnPeerInfo -{ - /* Peers can propose changes, - * but they are only accepted once - * the whole set operation is done. */ - int is_commited; -}; struct RfnElementInfo { - struct GNUNET_SET_Element *element; + const struct GNUNET_SET_Element *element; /* * Vote (or VOTE_NONE) from every peer @@ -323,12 +377,20 @@ struct ReferendumEntry * not counted for majority votes or thresholds. */ int *peer_commited; + + + /** + * Contestation state of the peer. If a peer is contested, the values it + * contributed are still counted for applying changes, but the grading is + * affected. + */ + int *peer_contested; }; struct DiffElementInfo { - struct GNUNET_SET_Element *element; + const struct GNUNET_SET_Element *element; /** * Positive weight for 'add', negative @@ -468,13 +530,13 @@ static void finish_task (struct TaskEntry *task); static void -run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task); +task_start_reconcile (struct TaskEntry *task); static void -run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task); +task_start_eval_rfn (struct TaskEntry *task); static void -run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task); +task_start_apply_diff (struct TaskEntry *task); static void run_ready_steps (struct ConsensusSession *session); @@ -492,6 +554,7 @@ phasename (uint16_t phase) case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM"; case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE"; case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT"; + case PHASE_KIND_APPLY_REP: return "APPLY_REP"; default: return "(unknown)"; } } @@ -503,7 +566,7 @@ setname (uint16_t kind) switch (kind) { case SET_KIND_CURRENT: return "CURRENT"; - case SET_KIND_LEADER: return "LEADER"; + case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL"; case SET_KIND_NONE: return "NONE"; default: return "(unknown)"; } @@ -527,12 +590,26 @@ diffname (uint16_t kind) switch (kind) { case DIFF_KIND_NONE: return "NONE"; - case DIFF_KIND_LEADER: return "LEADER"; + case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS"; case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT"; + case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL"; default: return "(unknown)"; } } +#ifdef GNUNET_EXTRA_LOGGING + + +static const char * +debug_str_element (const struct GNUNET_SET_Element *el) +{ + struct GNUNET_HashCode hash; + + GNUNET_SET_element_hash (el, &hash); + + return GNUNET_h2s (&hash); +} + static const char * debug_str_task_key (struct TaskKey *tk) { @@ -583,6 +660,8 @@ debug_str_rfn_key (struct RfnKey *rk) return buf; } +#endif /* GNUNET_EXTRA_LOGGING */ + /** * Destroy a session, free all resources associated with it. @@ -602,6 +681,8 @@ destroy_session (struct ConsensusSession *session) { GNUNET_MQ_destroy (session->client_mq); session->client_mq = NULL; + /* The MQ cleanup will also disconnect the underlying client. */ + session->client = NULL; } if (NULL != session->client) { @@ -634,8 +715,9 @@ send_to_client_iter (void *cls, struct GNUNET_CONSENSUS_ElementMessage *m; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "P%d: got element for client\n", - session->local_peer_idx); + "P%d: sending element %s to client\n", + session->local_peer_idx, + debug_str_element (element)); ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); @@ -736,7 +818,36 @@ diff_insert (struct DiffEntry *diff, int weight, const struct GNUNET_SET_Element *element) { - GNUNET_assert (0); + struct DiffElementInfo *di; + struct GNUNET_HashCode hash; + + GNUNET_assert ( (1 == weight) || (-1 == weight)); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "diff_insert with element size %u\n", + element->size); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "hashing element\n"); + + GNUNET_SET_element_hash (element, &hash); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "hashed element\n"); + + di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash); + + if (NULL == di) + { + di = GNUNET_new (struct DiffElementInfo); + di->element = GNUNET_SET_element_dup (element); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (diff->changes, + &hash, di, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + } + + di->weight = weight; } @@ -747,10 +858,39 @@ rfn_vote (struct ReferendumEntry *rfn, int vote, const struct GNUNET_SET_Element *element) { + struct RfnElementInfo *ri; + struct GNUNET_HashCode hash; + GNUNET_assert (voting_peer < num_peers); - GNUNET_assert (0); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "voting for element of size %u\n", + element->size); + + rfn->peer_commited[voting_peer] = GNUNET_YES; + + GNUNET_SET_element_hash (element, &hash); + ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash); + + + if (NULL == ri) + { + ri = GNUNET_new (struct RfnElementInfo); + ri->element = GNUNET_SET_element_dup (element); + ri->votes = GNUNET_new_array (num_peers, int); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements, + &hash, ri, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "rfn vote element %p\n", + ri->element); + ri->votes[voting_peer] = vote; } + uint16_t task_other_peer (struct TaskEntry *task) { @@ -760,6 +900,7 @@ task_other_peer (struct TaskEntry *task) return task->key.peer1; } + /** * Callback for set operation results. Called for each element * in the result set. @@ -779,6 +920,10 @@ set_result_cb (void *cls, struct DiffEntry *output_diff = NULL; struct ReferendumEntry *output_rfn = NULL; unsigned int other_idx; + struct SetOpCls *setop; + + setop = &task->cls.setop; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got set result for {%s}, status %u\n", @@ -786,7 +931,7 @@ set_result_cb (void *cls, debug_str_task_key (&task->key), status); - if (GNUNET_NO == task->is_running) + if (GNUNET_NO == task->is_started) { GNUNET_break_op (0); return; @@ -808,14 +953,23 @@ set_result_cb (void *cls, GNUNET_assert (0); } - if (SET_KIND_NONE != task->output_set.set_kind) - output_set = lookup_set (session, &task->output_set); + if (SET_KIND_NONE != setop->output_set.set_kind) + { + output_set = lookup_set (session, &setop->output_set); + GNUNET_assert (NULL != output_set); + } - if (DIFF_KIND_NONE != task->output_diff.diff_kind) - output_diff = lookup_diff (session, &task->output_diff); + if (DIFF_KIND_NONE != setop->output_diff.diff_kind) + { + output_diff = lookup_diff (session, &setop->output_diff); + GNUNET_assert (NULL != output_diff); + } - if (RFN_KIND_NONE != task->output_rfn.rfn_kind) - output_rfn = lookup_rfn (session, &task->output_rfn); + if (RFN_KIND_NONE != setop->output_rfn.rfn_kind) + { + output_rfn = lookup_rfn (session, &setop->output_rfn); + GNUNET_assert (NULL != output_rfn); + } if (GNUNET_YES == session->peers_ignored[other_idx]) { @@ -827,8 +981,10 @@ set_result_cb (void *cls, switch (status) { - // case GNUNET_SET_STATUS_MISSING_LOCAL: - case GNUNET_SET_STATUS_OK: + case GNUNET_SET_STATUS_ADD_LOCAL: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding element in Task {%s}\n", + debug_str_task_key (&task->key)); if (NULL != output_set) { // FIXME: record pending adds, use callback @@ -836,25 +992,95 @@ set_result_cb (void *cls, element, NULL, NULL); - +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: adding element %s into set {%s} of task {%s}\n", + session->local_peer_idx, + debug_str_element (element), + debug_str_set_key (&setop->output_set), + debug_str_task_key (&task->key)); +#endif } if (NULL != output_diff) { diff_insert (output_diff, 1, element); +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: adding element %s into diff {%s} of task {%s}\n", + session->local_peer_idx, + debug_str_element (element), + debug_str_diff_key (&setop->output_diff), + debug_str_task_key (&task->key)); +#endif } if (NULL != output_rfn) { rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element); +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: adding element %s into rfn {%s} of task {%s}\n", + session->local_peer_idx, + debug_str_element (element), + debug_str_rfn_key (&setop->output_rfn), + debug_str_task_key (&task->key)); +#endif } // XXX: add result to structures in task break; - //case GNUNET_SET_STATUS_MISSING_REMOTE: - // // XXX: add result to structures in task - // break; + case GNUNET_SET_STATUS_ADD_REMOTE: + if (GNUNET_YES == setop->do_not_remove) + break; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Removing element in Task {%s}\n", + debug_str_task_key (&task->key)); + if (NULL != output_set) + { + // FIXME: record pending adds, use callback + GNUNET_SET_remove_element (output_set->h, + element, + NULL, + NULL); +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: removing element %s from set {%s} of task {%s}\n", + session->local_peer_idx, + debug_str_element (element), + debug_str_set_key (&setop->output_set), + debug_str_task_key (&task->key)); +#endif + } + if (NULL != output_diff) + { + diff_insert (output_diff, -1, element); +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: removing element %s from diff {%s} of task {%s}\n", + session->local_peer_idx, + debug_str_element (element), + debug_str_diff_key (&setop->output_diff), + debug_str_task_key (&task->key)); +#endif + } + if (NULL != output_rfn) + { + rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_REMOVE, element); +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: removing element %s from rfn {%s} of task {%s}\n", + session->local_peer_idx, + debug_str_element (element), + debug_str_rfn_key (&setop->output_rfn), + debug_str_task_key (&task->key)); +#endif + } + break; case GNUNET_SET_STATUS_DONE: // XXX: check first if any changes to the underlying // set are still pending // XXX: commit other peer in referendum + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Finishing setop in Task {%s}\n", + debug_str_task_key (&task->key)); finish_task (task); break; case GNUNET_SET_STATUS_FAILURE: @@ -867,6 +1093,84 @@ set_result_cb (void *cls, } } +#ifdef EVIL + +enum Evilness +{ + EVILNESS_NONE, + EVILNESS_CRAM, + EVILNESS_SLACK, +}; + +static void +get_evilness (struct ConsensusSession *session, enum Evilness *ret_type, unsigned int *ret_num) +{ + char *evil_spec; + char *field; + char *evil_type_str = NULL; + + if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "P%u: no evilness\n", + session->local_peer_idx); + *ret_type = EVILNESS_NONE; + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "P%u: got evilness spec\n", + session->local_peer_idx); + + for (field = strtok (evil_spec, "/"); + NULL != field; + field = strtok (NULL, "/")) + { + unsigned int peer_num; + unsigned int evil_num; + int ret; + + evil_type_str = NULL; + + ret = sscanf (field, "%u;%m[a-z];%u", &peer_num, &evil_type_str, &evil_num); + + if (ret != 3) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC, behaving like a good peer.\n", + field); + goto not_evil; + } + + GNUNET_assert (NULL != evil_type_str); + + if (peer_num == session->local_peer_idx) + { + if (0 == strcmp ("slack", evil_type_str)) + *ret_type = EVILNESS_SLACK; + else if (0 == strcmp ("cram", evil_type_str)) + { + *ret_type = EVILNESS_CRAM; + *ret_num = evil_num; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n"); + goto not_evil; + } + goto cleanup; + } + /* No GNUNET_free since memory was allocated by libc */ + free (evil_type_str); + evil_type_str = NULL; + } +not_evil: + *ret_type = EVILNESS_NONE; +cleanup: + GNUNET_free (evil_spec); + if (NULL != evil_type_str) + free (evil_type_str); +} + +#endif /** @@ -878,11 +1182,67 @@ commit_set (struct ConsensusSession *session, struct TaskEntry *task) { struct SetEntry *set; + struct SetOpCls *setop = &task->cls.setop; - GNUNET_assert (NULL != task->op); - set = lookup_set (session, &task->input_set); + GNUNET_assert (NULL != setop->op); + set = lookup_set (session, &setop->input_set); GNUNET_assert (NULL != set); - GNUNET_SET_commit (task->op, set->h); + +#ifdef EVIL + { + unsigned int i; + unsigned int evil_num; + enum Evilness evilness; + + get_evilness (session, &evilness, &evil_num); + switch (evilness) + { + case EVILNESS_CRAM: + /* We're not cramming elements in the + all-to-all round, since that would just + add more elements to the result set, but + wouldn't test robustness. */ + if (PHASE_KIND_ALL_TO_ALL == task->key.kind) + { + GNUNET_SET_commit (setop->op, set->h); + break; + } + for (i = 0; i < evil_num; i++) + { + struct GNUNET_HashCode hash; + struct GNUNET_SET_Element element; + element.data = &hash; + element.size = sizeof (struct GNUNET_HashCode); + element.element_type = 0; + + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); + GNUNET_SET_add_element (set->h, &element, NULL, NULL); +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n", + session->local_peer_idx, + debug_str_element (&element), + debug_str_set_key (&setop->input_set), + debug_str_task_key (&task->key)); +#endif + } + GNUNET_SET_commit (setop->op, set->h); + break; + case EVILNESS_SLACK: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: evil peer: slacking\n", + session->local_peer_idx, + evil_num); + /* Do nothing. */ + break; + case EVILNESS_NONE: + GNUNET_SET_commit (setop->op, set->h); + break; + } + } +#else + GNUNET_SET_commit (setop->op, set->h); +#endif } @@ -892,6 +1252,8 @@ put_diff (struct ConsensusSession *session, { struct GNUNET_HashCode hash; + GNUNET_assert (NULL != diff); + GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff, @@ -906,6 +1268,10 @@ put_set (struct ConsensusSession *session, GNUNET_assert (NULL != set->h); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Putting set %s\n", + debug_str_set_key (&set->key)); + GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set, @@ -931,26 +1297,183 @@ static void output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy) { struct TaskEntry *task = (struct TaskEntry *) cls; + struct SetOpCls *setop = &task->cls.setop; struct ConsensusSession *session = task->step->session; struct SetEntry *set = GNUNET_new (struct SetEntry); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Received lazy copy, storing output set %s\n", - session->local_peer_idx, debug_str_set_key (&task->output_set)); + session->local_peer_idx, debug_str_set_key (&setop->output_set)); - set->key = task->output_set; + set->key = setop->output_set; set->h = copy; put_set (task->step->session, set); - run_task_remote_union (task->step->session, task); + task_start_reconcile (task); } static void -run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task) +task_cancel_reconcile (struct TaskEntry *task) +{ + /* not implemented yet */ + GNUNET_assert (0); +} + + +static void +apply_diff_to_rfn (struct DiffEntry *diff, + struct ReferendumEntry *rfn, + uint16_t voting_peer, + uint16_t num_peers) +{ + struct GNUNET_CONTAINER_MultiHashMapIterator *iter; + struct DiffElementInfo *di; + + iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes); + + while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di)) + { + if (di->weight > 0) + { + rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element); + } + if (di->weight < 0) + { + rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element); + } + } + + GNUNET_CONTAINER_multihashmap_iterator_destroy (iter); +} + + +struct DiffEntry * +diff_create () +{ + struct DiffEntry *d = GNUNET_new (struct DiffEntry); + + d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO); + + return d; +} + + +struct DiffEntry * +diff_compose (struct DiffEntry *diff_1, + struct DiffEntry *diff_2) +{ + struct DiffEntry *diff_new; + struct GNUNET_CONTAINER_MultiHashMapIterator *iter; + struct DiffElementInfo *di; + + diff_new = diff_create (); + + iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes); + while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "iterating first diff\n"); + diff_insert (diff_new, di->weight, di->element); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "insert done\n"); + } + GNUNET_CONTAINER_multihashmap_iterator_destroy (iter); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "iterating first diff done\n"); + + iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes); + while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "iterating second diff\n"); + diff_insert (diff_new, di->weight, di->element); + } + GNUNET_CONTAINER_multihashmap_iterator_destroy (iter); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "iterating second diff done\n"); + + return diff_new; +} + + +struct ReferendumEntry * +rfn_create (uint16_t size) +{ + struct ReferendumEntry *rfn; + + rfn = GNUNET_new (struct ReferendumEntry); + rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO); + rfn->peer_commited = GNUNET_new_array (size, int); + + return rfn; +} + + +static void +diff_destroy (struct DiffEntry *diff) +{ + GNUNET_CONTAINER_multihashmap_destroy (diff->changes); + GNUNET_free (diff); +} + + +static void +task_start_leader_apply (struct TaskEntry *task) +{ + struct LeaderApplyCls *lacls = &task->cls.leader_apply; + struct ConsensusSession *session = task->step->session; + struct DiffEntry *diff_1; + struct DiffEntry *diff_2; + struct DiffEntry *diff_combined; + struct ReferendumEntry *rfn; + + diff_1 = lookup_diff (session, &lacls->input_diff_1); + GNUNET_assert (NULL != diff_1); + + diff_2 = lookup_diff (session, &lacls->input_diff_2); + GNUNET_assert (NULL != diff_2); + + rfn = lookup_rfn (session, &lacls->output_rfn); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "looked up everything\n"); + + if (NULL == rfn) + { + rfn = rfn_create (session->num_peers); + rfn->key = lacls->output_rfn; + put_rfn (session, rfn); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ensured rfn\n"); + + diff_combined = diff_compose (diff_1, diff_2); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "composed diffs\n"); + + apply_diff_to_rfn (diff_combined, rfn, task->key.leader, session->num_peers); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "applied diffs to rfns\n"); + + diff_destroy (diff_combined); + + finish_task (task); +} + + +static void +task_start_reconcile (struct TaskEntry *task) { struct SetEntry *input; + struct SetOpCls *setop = &task->cls.setop; + struct ConsensusSession *session = task->step->session; - input = lookup_set (session, &task->input_set); + input = lookup_set (session, &setop->input_set); GNUNET_assert (NULL != input); GNUNET_assert (NULL != input->h); @@ -959,11 +1482,11 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task) because we want something valid in there, even if the other peer doesn't talk to us */ - if (SET_KIND_NONE != task->output_set.set_kind) + if (SET_KIND_NONE != setop->output_set.set_kind) { /* If we don't have an existing output set, we clone the input set. */ - if (NULL == lookup_set (session, &task->output_set)) + if (NULL == lookup_set (session, &setop->output_set)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Output set missing, copying from input set\n"); @@ -975,25 +1498,35 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task) } } - if (RFN_KIND_NONE != task->output_rfn.rfn_kind) + if (RFN_KIND_NONE != setop->output_rfn.rfn_kind) { - if (NULL == lookup_rfn (session, &task->output_rfn)) + if (NULL == lookup_rfn (session, &setop->output_rfn)) { struct ReferendumEntry *rfn; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: output rfn <%s> missing, creating.\n", session->local_peer_idx, - debug_str_rfn_key (&task->output_rfn)); + debug_str_rfn_key (&setop->output_rfn)); - rfn = GNUNET_new (struct ReferendumEntry); - rfn->key = task->output_rfn; - rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO); - rfn->peer_commited = GNUNET_new_array (session->num_peers, int); + rfn = rfn_create (session->num_peers); + rfn->key = setop->output_rfn; put_rfn (session, rfn); } } + if (DIFF_KIND_NONE != setop->output_diff.diff_kind) + { + if (NULL == lookup_diff (session, &setop->output_diff)) + { + struct DiffEntry *diff; + + diff = diff_create (); + diff->key = setop->output_diff; + put_diff (session, diff); + } + } + if (task->key.peer1 == session->local_peer_idx) { struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 }; @@ -1001,7 +1534,7 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Looking up set {%s} to run remote union\n", session->local_peer_idx, - debug_str_set_key (&task->input_set)); + debug_str_set_key (&setop->input_set)); rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage)); @@ -1012,23 +1545,20 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task) rcm.leader = htons (task->key.leader); rcm.repetition = htons (task->key.repetition); - GNUNET_assert (NULL == task->op); + GNUNET_assert (NULL == setop->op); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n", - session->local_peer_idx, task->key.peer2, debug_str_set_key (&task->input_set)); + session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set)); // XXX: maybe this should be done while // setting up tasks alreays? - task->op = GNUNET_SET_prepare (&session->peers[task->key.peer2], - &session->global_id, - &rcm.header, - GNUNET_SET_RESULT_ADDED, /* XXX: will be obsolete soon */ - set_result_cb, - task); - - /* Referendums must be materialized as a set before */ - GNUNET_assert (RFN_KIND_NONE == task->input_rfn.rfn_kind); - - if (GNUNET_OK != GNUNET_SET_commit (task->op, input->h)) + setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2], + &session->global_id, + &rcm.header, + GNUNET_SET_RESULT_SYMMETRIC, + set_result_cb, + task); + + if (GNUNET_OK != GNUNET_SET_commit (setop->op, input->h)) { GNUNET_break (0); /* XXX: cleanup? */ @@ -1041,9 +1571,8 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n", session->local_peer_idx, task->key.peer1); - if (NULL != task->op) + if (NULL != setop->op) { - GNUNET_assert (NULL == task->commited_set); commit_set (session, task); } } @@ -1159,11 +1688,11 @@ eval_rfn_copy_cb (void *cls, struct GNUNET_SET_Handle *copy) set = GNUNET_new (struct SetEntry); set->h = copy; - set->key = task->output_set; + set->key = task->cls.eval_rfn.output_set; put_set (session, set); - run_task_eval_rfn (session, task); + task_start_eval_rfn (task); } @@ -1173,7 +1702,7 @@ eval_rfn_copy_cb (void *cls, struct GNUNET_SET_Handle *copy) * set and store the result in the output set and/or output diff. */ static void -run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task) +task_start_eval_rfn (struct TaskEntry *task) { struct GNUNET_CONTAINER_MultiHashMapIterator *iter; struct ReferendumEntry *input_rfn; @@ -1181,24 +1710,23 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task) struct SetEntry *output_set = NULL; struct DiffEntry *output_diff = NULL; struct SetChangeProgressCls *progress_cls; + struct EvalRfnCls *rcls = &task->cls.eval_rfn; + struct ConsensusSession *session = task->step->session; /* Have at least one output */ - GNUNET_assert ( (task->output_set.set_kind != SET_KIND_NONE) || - (task->output_diff.diff_kind != DIFF_KIND_NONE)); - - /* Not allowed as output */ - GNUNET_assert ( (task->output_rfn.rfn_kind == RFN_KIND_NONE)); + GNUNET_assert ( (rcls->output_set.set_kind != SET_KIND_NONE) || + (rcls->output_diff.diff_kind != DIFF_KIND_NONE)); - if (SET_KIND_NONE != task->output_set.set_kind) + if (SET_KIND_NONE != rcls->output_set.set_kind) { /* We have a set output, thus the output set must exist or copy it from the input set */ - output_set = lookup_set (session, &task->output_set); + output_set = lookup_set (session, &rcls->output_set); if (NULL == output_set) { struct SetEntry *input_set; - input_set = lookup_set (session, &task->input_set); + input_set = lookup_set (session, &rcls->input_set); GNUNET_assert (NULL != input_set); GNUNET_SET_copy_lazy (input_set->h, eval_rfn_copy_cb, @@ -1209,21 +1737,26 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task) } } - if (DIFF_KIND_NONE != task->output_diff.diff_kind) + if (DIFF_KIND_NONE != rcls->output_diff.diff_kind) { - output_diff = lookup_diff (session, &task->output_diff); + output_diff = lookup_diff (session, &rcls->output_diff); if (NULL == output_diff) { - output_diff = GNUNET_new (struct DiffEntry); - output_diff->key = task->output_diff; - output_diff->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO); + output_diff = diff_create (); + output_diff->key = rcls->output_diff; put_diff (session, output_diff); } } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Evaluating referendum in Task {%s}\n", + debug_str_task_key (&task->key)); + + progress_cls = GNUNET_new (struct SetChangeProgressCls); + progress_cls->task = task; - input_rfn = lookup_rfn (session, &task->input_rfn); + input_rfn = lookup_rfn (session, &rcls->input_rfn); GNUNET_assert (NULL != input_rfn); @@ -1232,18 +1765,28 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task) while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) { - int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, task->threshold); + int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, rcls->threshold); switch (majority_vote) { case VOTE_ADD: +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "P%u: referendum vote result: VOTE_ADD for element %s in task {%s} with" + "output set {%s} and output diff {%s}\n", + session->local_peer_idx, + debug_str_element (ri->element), + debug_str_task_key (&task->key), + debug_str_set_key (&rcls->output_set), + debug_str_diff_key (&rcls->output_diff)); +#endif if (NULL != output_set) { progress_cls->num_pending++; GNUNET_assert (GNUNET_OK == GNUNET_SET_add_element (output_set->h, - ri->element, - eval_rfn_progress, - progress_cls)); + ri->element, + eval_rfn_progress, + progress_cls)); } if (NULL != output_diff) { @@ -1253,16 +1796,37 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task) case VOTE_CONTESTED: if (NULL != output_set) output_set->is_contested = GNUNET_YES; +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "P%u: referendum vote result: VOTE_CONTESTED for element %s in task {%s} with" + "output set {%s} and output diff {%s}\n", + session->local_peer_idx, + debug_str_element (ri->element), + debug_str_task_key (&task->key), + debug_str_set_key (&rcls->output_set), + debug_str_diff_key (&rcls->output_diff)); +#endif /* fallthrough */ case VOTE_REMOVE: +#ifdef GNUNET_EXTRA_LOGGING + if (VOTE_REMOVE == majority_vote) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "P%u: referendum vote result: VOTE_REMOVE for element %s in task {%s} with" + "output set {%s} and output diff {%s}\n", + session->local_peer_idx, + debug_str_element (ri->element), + debug_str_task_key (&task->key), + debug_str_set_key (&rcls->output_set), + debug_str_diff_key (&rcls->output_diff)); +#endif if (NULL != output_set) { progress_cls->num_pending++; GNUNET_assert (GNUNET_OK == GNUNET_SET_remove_element (output_set->h, - ri->element, - eval_rfn_progress, - progress_cls)); + ri->element, + eval_rfn_progress, + progress_cls)); } if (NULL != output_diff) { @@ -1270,6 +1834,8 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task) } break; case VOTE_NONE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "referendum vote result: VOTE_NONE\n"); /* Nothing to do. */ break; default: @@ -1294,14 +1860,15 @@ apply_diff_copy_cb (void *cls, struct GNUNET_SET_Handle *copy) struct TaskEntry *task = (struct TaskEntry *) cls; struct ConsensusSession *session = task->step->session; struct SetEntry *set; + struct ApplyDiffCls *diffop = &task->cls.apply_diff; set = GNUNET_new (struct SetEntry); set->h = copy; - set->key = task->output_set; + set->key = diffop->output_set; put_set (session, set); - run_task_apply_diff (session, task); + task_start_apply_diff (task); } @@ -1334,28 +1901,30 @@ apply_diff_progress (void *cls) static void -run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task) +task_start_apply_diff (struct TaskEntry *task) { struct SetEntry *output_set; struct DiffEntry *input_diff; struct GNUNET_CONTAINER_MultiHashMapIterator *iter; struct DiffElementInfo *di; struct SetChangeProgressCls *progress_cls; + struct ApplyDiffCls *diffop = &task->cls.apply_diff; + struct ConsensusSession *session = task->step->session; - GNUNET_assert (task->output_set.set_kind != SET_KIND_NONE); - GNUNET_assert (task->input_diff.diff_kind != DIFF_KIND_NONE); + GNUNET_assert (diffop->output_set.set_kind != SET_KIND_NONE); + GNUNET_assert (diffop->input_diff.diff_kind != DIFF_KIND_NONE); - input_diff = lookup_diff (session, &task->input_diff); + input_diff = lookup_diff (session, &diffop->input_diff); GNUNET_assert (NULL != input_diff); - output_set = lookup_set (session, &task->output_set); + output_set = lookup_set (session, &diffop->output_set); if (NULL == output_set) { struct SetEntry *input_set; - input_set = lookup_set (session, &task->input_set); + input_set = lookup_set (session, &diffop->input_set); GNUNET_assert (NULL != input_set); GNUNET_SET_copy_lazy (input_set->h, apply_diff_copy_cb, @@ -1403,11 +1972,12 @@ run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task) static void -run_task_finish (struct ConsensusSession *session, struct TaskEntry *task) +task_start_finish (struct TaskEntry *task) { struct SetEntry *final_set; + struct ConsensusSession *session = task->step->session; - final_set = lookup_set (session, &task->input_set); + final_set = lookup_set (session, &task->cls.finish.input_set); GNUNET_assert (NULL != final_set); @@ -1418,37 +1988,17 @@ run_task_finish (struct ConsensusSession *session, struct TaskEntry *task) } static void -run_task (struct ConsensusSession *session, struct TaskEntry *task) +start_task (struct ConsensusSession *session, struct TaskEntry *task) { - GNUNET_assert (GNUNET_NO == task->is_running); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key)); + + GNUNET_assert (GNUNET_NO == task->is_started); GNUNET_assert (GNUNET_NO == task->is_finished); + GNUNET_assert (NULL != task->start); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key)); + task->start (task); - switch (task->action) - { - case ACTION_RECONCILE: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_RECONCILE task\n", session->local_peer_idx); - run_task_remote_union (session, task); - break; - case ACTION_EVAL_RFN: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_EVAL_RFN task\n", session->local_peer_idx); - run_task_eval_rfn (session, task); - break; - case ACTION_APPLY_DIFF: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_APPLY_DIFF task\n", session->local_peer_idx); - run_task_apply_diff (session, task); - break; - case ACTION_FINISH: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_FINISH task\n", session->local_peer_idx); - run_task_finish (session, task); - break; - default: - /* not reached */ - GNUNET_assert (0); - } - task->is_running = GNUNET_YES; + task->is_started = GNUNET_YES; } @@ -1515,7 +2065,7 @@ run_ready_steps (struct ConsensusSession *session) step->is_running = GNUNET_YES; for (i = 0; i < step->tasks_len; i++) - run_task (session, step->tasks[i]); + start_task (session, step->tasks[i]); /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */ if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished)) @@ -1730,12 +2280,6 @@ set_listen_cb (void *cls, return; } - if (ACTION_RECONCILE != task->action) - { - GNUNET_break_op (0); - return; - } - if (GNUNET_YES == task->is_finished) { GNUNET_break_op (0); @@ -1754,15 +2298,15 @@ set_listen_cb (void *cls, else my_result_cb = set_result_cb; - task->op = GNUNET_SET_accept (request, - GNUNET_SET_RESULT_ADDED, /* XXX: obsolete soon */ + task->cls.setop.op = GNUNET_SET_accept (request, + GNUNET_SET_RESULT_SYMMETRIC, my_result_cb, task); /* If the task hasn't been started yet, we wait for that until we commit. */ - if (GNUNET_YES == task->is_running) + if (GNUNET_YES == task->is_started) { commit_set (session, task); } @@ -1969,11 +2513,11 @@ construct_task_graph_gradecast (struct ConsensusSession *session, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead); task = ((struct TaskEntry) { .step = step, - .action = ACTION_RECONCILE, + .start = task_start_reconcile, + .cancel = task_cancel_reconcile, .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me }, - .input_set = (struct SetKey) { SET_KIND_CURRENT, rep }, - .output_set = (struct SetKey) { SET_KIND_NONE }, }); + task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep }; put_task (session->taskmap, &task); } /* We run this task to make sure that the leader @@ -1982,12 +2526,13 @@ construct_task_graph_gradecast (struct ConsensusSession *session, without the code having to handle any special cases. */ task = ((struct TaskEntry) { .step = step, - .action = ACTION_RECONCILE, .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me }, - .input_set = (struct SetKey) { SET_KIND_CURRENT, rep }, - .output_set = (struct SetKey) { SET_KIND_LEADER, rep, me }, - .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, me }, + .start = task_start_reconcile, + .cancel = task_cancel_reconcile, }); + task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep }; + task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me }; + task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me }; put_task (session->taskmap, &task); } else @@ -1998,12 +2543,13 @@ construct_task_graph_gradecast (struct ConsensusSession *session, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead); task = ((struct TaskEntry) { .step = step, - .action = ACTION_RECONCILE, .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead}, - .input_set = (struct SetKey) { SET_KIND_CURRENT, rep }, - .output_set = (struct SetKey) { SET_KIND_LEADER, rep, lead }, - .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, lead }, + .start = task_start_reconcile, + .cancel = task_cancel_reconcile, }); + task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep }; + task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead }; + task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead }; put_task (session->taskmap, &task); } @@ -2022,11 +2568,12 @@ construct_task_graph_gradecast (struct ConsensusSession *session, arrange_peers (&p1, &p2, n); task = ((struct TaskEntry) { .step = step, - .action = ACTION_RECONCILE, .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead }, - .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead }, - .output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }, + .start = task_start_reconcile, + .cancel = task_cancel_reconcile, }); + task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead }; + task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }; put_task (session->taskmap, &task); } @@ -2041,12 +2588,12 @@ construct_task_graph_gradecast (struct ConsensusSession *session, task = ((struct TaskEntry) { .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead }, .step = step, - .action = ACTION_EVAL_RFN, - .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead }, - .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }, - .output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead }, - .threshold = n - t, + .start = task_start_eval_rfn }); + task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead }, + task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }, + task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead }, + task.cls.eval_rfn.threshold = n - t, put_task (session->taskmap, &task); prev_step = step; @@ -2064,11 +2611,12 @@ construct_task_graph_gradecast (struct ConsensusSession *session, arrange_peers (&p1, &p2, n); task = ((struct TaskEntry) { .step = step, - .action = ACTION_RECONCILE, + .start = task_start_reconcile, + .cancel = task_cancel_reconcile, .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead}, - .input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead }, - .output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead }, }); + task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead }; + task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead }; put_task (session->taskmap, &task); } @@ -2081,13 +2629,32 @@ construct_task_graph_gradecast (struct ConsensusSession *session, // evaluate ConfirmationReferendum and // apply it to the LeaderReferendum + // XXX: the diff should contain grading information task = ((struct TaskEntry) { .step = step, .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead }, - .action = ACTION_EVAL_RFN, - .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }, - .output_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, rep }, + .start = task_start_eval_rfn, }); + task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }; + task.cls.eval_rfn.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_CONSENSUS, rep, lead }; + put_task (session->taskmap, &task); + + + prev_step = step; + step = create_step (session, round, 1); +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_asprintf (&step->debug_name, "gc apply, lead %u rep %u", lead, rep); +#endif + step_depend_on (step, prev_step); + + task = ((struct TaskEntry) { + .step = step, + .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, rep, lead }, + .start = task_start_leader_apply, + }); + task.cls.leader_apply.input_diff_1 = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead }; + task.cls.leader_apply.input_diff_2 = (struct DiffKey) { DIFF_KIND_LEADER_CONSENSUS, rep, lead }; + task.cls.leader_apply.output_rfn = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, rep }; put_task (session->taskmap, &task); step_depend_on (step_after, step); @@ -2142,10 +2709,12 @@ construct_task_graph (struct ConsensusSession *session) task = ((struct TaskEntry) { .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 }, .step = step, - .action = ACTION_RECONCILE, - .input_set = (struct SetKey) { SET_KIND_CURRENT, 0 }, - .output_set = (struct SetKey) { SET_KIND_CURRENT, 0 }, + .start = task_start_reconcile, + .cancel = task_cancel_reconcile, }); + task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 }; + task.cls.setop.output_set = task.cls.setop.input_set; + task.cls.setop.do_not_remove = GNUNET_YES; put_task (session->taskmap, &task); } @@ -2164,32 +2733,30 @@ construct_task_graph (struct ConsensusSession *session) step_rep_start = create_step (session, round, 1); #ifdef GNUNET_EXTRA_LOGGING - GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i); + GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i); #endif step_depend_on (step_rep_start, prev_step); step_rep_end = create_step (session, round, 1); #ifdef GNUNET_EXTRA_LOGGING - GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i); + GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i); #endif /* parallel gradecasts */ for (lead = 0; lead < n; lead++) construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end); - // TODO: add peers to ignore list, - // - // evaluate ConfirmationReferendum and - // apply it to the LeaderReferendum + // TODO: add peers to ignore list, either here or + // already in the gradecast. task = ((struct TaskEntry) { .step = step_rep_end, - .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, i, -1}, - .action = ACTION_APPLY_DIFF, - .input_set = (struct SetKey) { SET_KIND_CURRENT, i }, - .input_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, i }, - .output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 }, + .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1}, + .start = task_start_eval_rfn, }); + task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_CURRENT, i }; + task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, i }; + task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 }; put_task (session->taskmap, &task); prev_step = step_rep_end; @@ -2206,9 +2773,9 @@ construct_task_graph (struct ConsensusSession *session) task = ((struct TaskEntry) { .step = step, .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 }, - .input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 }, - .action = ACTION_FINISH, + .start = task_start_finish, }); + task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 }; put_task (session->taskmap, &task); } @@ -2399,10 +2966,21 @@ client_insert (void *cls, } session->num_client_insert_pending++; GNUNET_SET_add_element (initial_set, element, client_insert_done, session); + +#ifdef GNUNET_EXTRA_LOGGING + { + struct GNUNET_HashCode hash; + + GNUNET_SET_element_hash (element, &hash); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n", + session->local_peer_idx, + GNUNET_h2s (&hash)); + } +#endif + GNUNET_free (element); GNUNET_SERVER_receive_done (client, GNUNET_OK); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx); } diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 335bd2051..f8af4aa38 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf @@ -1,15 +1,14 @@ -@INLINE@ ../../contrib/no_forcestart.conf +#@INLINE@ ../../contrib/no_forcestart.conf [PATHS] GNUNET_TEST_HOME = /tmp/test-consensus/ [consensus] -# PREFIX = valgrind +PREFIX = valgrind OPTIONS = -L INFO BINARY = gnunet-service-evil-consensus -# Evil behavior: Peer 0 does not execute leader step -#EVIL_SPEC = 0;pass;leader +EVIL_SPEC = 0;cram;5 # Evil behavior: Peer 0 adds 5 random elements when he is the gradecast leader # (every peer gets the same element. @@ -19,6 +18,11 @@ BINARY = gnunet-service-evil-consensus # (every peer gets different elements). #EVIL_SPEC = 0;stuff-different;leader;5 + + +[core] +FORECESTART = YES + [cadet] #PREFIX = valgrind diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index f9ae122b4..7246f9b42 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h @@ -470,6 +470,26 @@ GNUNET_SET_iterate (struct GNUNET_SET_Handle *set, void GNUNET_SET_iterate_cancel (struct GNUNET_SET_Handle *set); +/** + * Create a copy of an element. The copy + * must be GNUNET_free-d by the caller. + * + * @param element the element to copy + * @return the copied element + */ +struct GNUNET_SET_Element * +GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element); + +/** + * Hash a set element. + * + * @param element the element that should be hashed + * @param ret_hash a pointer to where the hash of @a element + * should be stored + */ +void +GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash); + #if 0 /* keep Emacsens' auto-indent happy */ { diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 2f1578514..986f52982 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -844,6 +844,11 @@ execute_add (struct Set *set, ee->mutations = NULL; ee->mutations_size = 0; ee->element_hash = hash; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (set->content->elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); } else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) { @@ -859,11 +864,6 @@ execute_add (struct Set *set, GNUNET_array_append (ee->mutations, ee->mutations_size, mut); } - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_put (set->content->elements, - &ee->element_hash, - ee, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); set->vt->add (set->state, ee); } diff --git a/src/set/set_api.c b/src/set/set_api.c index 16aa87cd0..6cbee9a57 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c @@ -1112,4 +1112,40 @@ GNUNET_SET_copy_lazy (struct GNUNET_SET_Handle *set, } +/** + * Create a copy of an element. The copy + * must be GNUNET_free-d by the caller. + * + * @param element the element to copy + * @return the copied element + */ +struct GNUNET_SET_Element * +GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element) +{ + struct GNUNET_SET_Element *copy; + + copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element)); + copy->size = element->size; + copy->element_type = element->element_type; + copy->data = ©[1]; + memcpy ((void *) copy->data, element->data, copy->size); + + return copy; +} + + +/** + * Hash a set element. + * + * @param element the element that should be hashed + * @param ret_hash a pointer to where the hash of @a element + * should be stored + */ +void +GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash) +{ + /* FIXME: The element type should also be hashed. */ + GNUNET_CRYPTO_hash (element->data, element->size, ret_hash); +} + /* end of set_api.c */ -- cgit v1.2.3