From 99cfe33894a311ef45d5552248625615324ec628 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 9 Mar 2016 16:17:28 +0000 Subject: Implement early stopping. --- src/consensus/gnunet-service-consensus.c | 256 +++++++++++++++++++++++++------ 1 file changed, 207 insertions(+), 49 deletions(-) (limited to 'src/consensus/gnunet-service-consensus.c') diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 9c71a98a7..e8385a6bb 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -55,6 +55,14 @@ enum ReferendumVote }; +enum EarlyStoppingPhase +{ + EARLY_STOPPING_NONE = 0, + EARLY_STOPPING_ONE_MORE = 1, + EARLY_STOPPING_DONE = 2, +}; + + GNUNET_NETWORK_STRUCT_BEGIN @@ -157,6 +165,10 @@ enum SetKind { SET_KIND_NONE = 0, SET_KIND_CURRENT, + /** + * Last result set from a gradecast + */ + SET_KIND_LAST_GRADECAST, SET_KIND_LEADER_PROPOSAL, SET_KIND_ECHO_RESULT, }; @@ -249,6 +261,9 @@ struct Step struct ConsensusSession *session; + /** + * Tasks that this step is composed of. + */ struct TaskEntry **tasks; unsigned int tasks_len; unsigned int tasks_cap; @@ -293,6 +308,19 @@ struct Step * the task, used for debugging. */ char *debug_name; + + /** + * When we're doing an early finish, how should this step be + * treated? + * If GNUNET_YES, the step will be marked as finished + * without actually running its tasks. + * Otherwise, the step will still be run even after + * an early finish. + * + * Note that a task may never be finished early if + * it is already running. + */ + int early_finishable; }; @@ -459,6 +487,11 @@ struct ConsensusSession * Uses the session's global id as app id. */ struct GNUNET_SET_ListenHandle *set_listener; + + /** + * State of our early stopping scheme. + */ + int early_stopping; }; /** @@ -1367,9 +1400,9 @@ put_set (struct ConsensusSession *session, debug_str_set_key (&set->key)); GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash); - GNUNET_assert (GNUNET_OK == + GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE)); } @@ -1487,6 +1520,11 @@ diff_destroy (struct DiffEntry *diff) } +/** + * For a given majority, count what the outcome + * is (add/remove/keep), and give the number + * of peers that voted for this outcome. + */ static void rfn_majority (const struct ReferendumEntry *rfn, const struct RfnElementInfo *ri, @@ -1605,6 +1643,89 @@ set_mutation_done (void *cls) } } + +static void +try_finish_step_early (struct Step *step) +{ + unsigned int i; + + if (GNUNET_YES == step->is_running) + return; + if (GNUNET_YES == step->is_finished) + return; + if (GNUNET_NO == step->early_finishable) + return; + + step->is_finished = GNUNET_YES; + +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Finishing step `%s' early.\n", + step->debug_name); +#endif + + for (i = 0; i < step->subordinates_len; i++) + { + GNUNET_assert (step->subordinates[i]->pending_prereq > 0); + step->subordinates[i]->pending_prereq--; +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Decreased pending_prereq to %u for step `%s'.\n", + step->subordinates[i]->pending_prereq, + step->subordinates[i]->debug_name); + +#endif + try_finish_step_early (step->subordinates[i]); + } + + // XXX: maybe schedule as task to avoid recursion? + run_ready_steps (step->session); +} + + +static void +finish_step (struct Step *step) +{ + unsigned int i; + + GNUNET_assert (step->finished_tasks == step->tasks_len); + GNUNET_assert (GNUNET_YES == step->is_running); + GNUNET_assert (GNUNET_NO == step->is_finished); + +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "All tasks of step `%s' with %u subordinates finished.\n", + step->debug_name, + step->subordinates_len); +#endif + + for (i = 0; i < step->subordinates_len; i++) + { + GNUNET_assert (step->subordinates[i]->pending_prereq > 0); + step->subordinates[i]->pending_prereq--; +#ifdef GNUNET_EXTRA_LOGGING + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Decreased pending_prereq to %u for step `%s'.\n", + step->subordinates[i]->pending_prereq, + step->subordinates[i]->debug_name); + +#endif + } + + step->is_finished = GNUNET_YES; + + // XXX: maybe schedule as task to avoid recursion? + run_ready_steps (step->session); +} + + + +/** + * Apply the result from one round of gradecasts (i.e. every peer + * should have gradecasted) to the peer's current set. + * + * @param task the task with context information + */ static void task_start_apply_round (struct TaskEntry *task) { @@ -1617,6 +1738,7 @@ task_start_apply_round (struct TaskEntry *task) struct GNUNET_CONTAINER_MultiHashMapIterator *iter; struct RfnElementInfo *ri; struct SetMutationProgressCls *progress_cls; + uint16_t worst_majority = UINT16_MAX; sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition }; rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; @@ -1644,6 +1766,9 @@ task_start_apply_round (struct TaskEntry *task) rfn_majority (rfn_in, ri, &majority_num, &majority_vote); + if (worst_majority > majority_num) + worst_majority = majority_num; + switch (majority_vote) { case VOTE_ADD: @@ -1653,6 +1778,10 @@ task_start_apply_round (struct TaskEntry *task) ri->element, set_mutation_done, progress_cls)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: apply round: adding element %s with %u-majority.\n", + session->local_peer_idx, + debug_str_element (ri->element), majority_num); break; case VOTE_REMOVE: progress_cls->num_pending++; @@ -1661,8 +1790,16 @@ task_start_apply_round (struct TaskEntry *task) ri->element, set_mutation_done, progress_cls)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: apply round: deleting element %s with %u-majority.\n", + session->local_peer_idx, + debug_str_element (ri->element), majority_num); break; case VOTE_STAY: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: apply round: keeping element %s with %u-majority.\n", + session->local_peer_idx, + debug_str_element (ri->element), majority_num); // do nothing break; default: @@ -1677,10 +1814,53 @@ task_start_apply_round (struct TaskEntry *task) GNUNET_free (progress_cls); finish_task (task); } -} + { + uint16_t thresh = (session->num_peers / 3) * 2; -#define THRESH(s) (((s)->num_peers / 3)) + if (worst_majority >= thresh) + { + switch (session->early_stopping) + { + case EARLY_STOPPING_NONE: + session->early_stopping = EARLY_STOPPING_ONE_MORE; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: Stopping early (after one more superround)\n", + session->local_peer_idx); + break; + case EARLY_STOPPING_ONE_MORE: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n", + session->local_peer_idx); + session->early_stopping = EARLY_STOPPING_DONE; + { + struct Step *step; + for (step = session->steps_head; NULL != step; step = step->next) + try_finish_step_early (step); + } + break; + case EARLY_STOPPING_DONE: + /* We shouldn't be here anymore after early stopping */ + GNUNET_break (0); + break; + default: + GNUNET_assert (0); + break; + } + } + else if (EARLY_STOPPING_NONE != session->early_stopping) + { + // Our assumption about the number of bad peers + // has been broken. + GNUNET_break_op (0); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n", + session->local_peer_idx); + } + } + +} static void @@ -1904,6 +2084,16 @@ task_start_eval_echo (struct TaskEntry *task) return; } + + { + // FIXME: should be marked as a shallow copy, so + // we can destroy everything correctly + struct SetEntry *last_set = GNUNET_new (struct SetEntry); + last_set->h = output_set->h; + last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST }; + put_set (session, last_set); + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Evaluating referendum in Task {%s}\n", debug_str_task_key (&task->key)); @@ -2009,39 +2199,6 @@ start_task (struct ConsensusSession *session, struct TaskEntry *task) } -static void finish_step (struct Step *step) -{ - unsigned int i; - - GNUNET_assert (step->finished_tasks == step->tasks_len); - GNUNET_assert (GNUNET_YES == step->is_running); - GNUNET_assert (GNUNET_NO == step->is_finished); - -#ifdef GNUNET_EXTRA_LOGGING - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "All tasks of step `%s' with %u subordinates finished.\n", - step->debug_name, - step->subordinates_len); -#endif - - for (i = 0; i < step->subordinates_len; i++) - { - GNUNET_assert (step->subordinates[i]->pending_prereq > 0); - step->subordinates[i]->pending_prereq--; -#ifdef GNUNET_EXTRA_LOGGING - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Decreased pending_prereq to %u for step `%s'.\n", - step->subordinates[i]->pending_prereq, - step->subordinates[i]->debug_name); - -#endif - } - - step->is_finished = GNUNET_YES; - - // XXX: maybe schedule as task to avoid recursion? - run_ready_steps (step->session); -} /* @@ -2057,7 +2214,7 @@ run_ready_steps (struct ConsensusSession *session) while (NULL != step) { - if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) ) + if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) ) { size_t i; @@ -2450,12 +2607,13 @@ step_depend_on (struct Step *step, struct Step *dep) static struct Step * -create_step (struct ConsensusSession *session, int round) +create_step (struct ConsensusSession *session, int round, int early_finishable) { struct Step *step; step = GNUNET_new (struct Step); step->session = session; step->round = round; + step->early_finishable = early_finishable; GNUNET_CONTAINER_DLL_insert_tail (session->steps_head, session->steps_tail, step); @@ -2494,7 +2652,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session, /* gcast step 1: leader disseminates */ - step = create_step (session, round); + step = create_step (session, round, GNUNET_YES); #ifdef GNUNET_EXTRA_LOGGING GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep); @@ -2554,7 +2712,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session, /* gcast phase 2: echo */ prev_step = step; round += 1; - step = create_step (session, round); + step = create_step (session, round, GNUNET_YES); #ifdef GNUNET_EXTRA_LOGGING GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep); #endif @@ -2578,7 +2736,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session, prev_step = step; /* Same round, since step only has local tasks */ - step = create_step (session, round); + step = create_step (session, round, GNUNET_YES); #ifdef GNUNET_EXTRA_LOGGING GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep); #endif @@ -2594,7 +2752,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session, prev_step = step; round += 1; - step = create_step (session, round); + step = create_step (session, round, GNUNET_YES); #ifdef GNUNET_EXTRA_LOGGING GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep); #endif @@ -2624,7 +2782,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session, prev_step = step; /* Same round, since step only has local tasks */ - step = create_step (session, round); + step = create_step (session, round, GNUNET_YES); #ifdef GNUNET_EXTRA_LOGGING GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep); #endif @@ -2675,7 +2833,7 @@ construct_task_graph (struct ConsensusSession *session) /* all-to-all step */ - step = create_step (session, round); + step = create_step (session, round, GNUNET_NO); #ifdef GNUNET_EXTRA_LOGGING step->debug_name = GNUNET_strdup ("all to all"); @@ -2712,7 +2870,7 @@ construct_task_graph (struct ConsensusSession *session) struct Step *step_rep_end; /* Every repetition is in a separate round. */ - step_rep_start = create_step (session, round); + step_rep_start = create_step (session, round, GNUNET_YES); #ifdef GNUNET_EXTRA_LOGGING GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i); #endif @@ -2721,7 +2879,7 @@ construct_task_graph (struct ConsensusSession *session) /* gradecast has three rounds */ round += 3; - step_rep_end = create_step (session, round); + step_rep_end = create_step (session, round, GNUNET_YES); #ifdef GNUNET_EXTRA_LOGGING GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i); #endif @@ -2743,7 +2901,7 @@ construct_task_graph (struct ConsensusSession *session) /* There is no next gradecast round, thus the final start step is the overall end step of the gradecasts */ round += 1; - step = create_step (session, round); + step = create_step (session, round, GNUNET_NO); #ifdef GNUNET_EXTRA_LOGGING GNUNET_asprintf (&step->debug_name, "finish"); #endif @@ -2754,7 +2912,7 @@ construct_task_graph (struct ConsensusSession *session) .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 }, .start = task_start_finish, }); - task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 }; + task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST }; put_task (session->taskmap, &task); } -- cgit v1.2.3