aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2016-03-09 16:17:28 +0000
committerFlorian Dold <florian.dold@gmail.com>2016-03-09 16:17:28 +0000
commit99cfe33894a311ef45d5552248625615324ec628 (patch)
tree45166f7bcb50058144a745858c73e5260d716608 /src/consensus/gnunet-service-consensus.c
parent1d2cec96289a1bd1600aa39dbc8ab422298e0973 (diff)
downloadgnunet-99cfe33894a311ef45d5552248625615324ec628.tar.gz
gnunet-99cfe33894a311ef45d5552248625615324ec628.zip
Implement early stopping.
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c256
1 files changed, 207 insertions, 49 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 9c71a98a7..e8385a6bb 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -55,6 +55,14 @@ enum ReferendumVote
55}; 55};
56 56
57 57
58enum EarlyStoppingPhase
59{
60 EARLY_STOPPING_NONE = 0,
61 EARLY_STOPPING_ONE_MORE = 1,
62 EARLY_STOPPING_DONE = 2,
63};
64
65
58GNUNET_NETWORK_STRUCT_BEGIN 66GNUNET_NETWORK_STRUCT_BEGIN
59 67
60 68
@@ -157,6 +165,10 @@ enum SetKind
157{ 165{
158 SET_KIND_NONE = 0, 166 SET_KIND_NONE = 0,
159 SET_KIND_CURRENT, 167 SET_KIND_CURRENT,
168 /**
169 * Last result set from a gradecast
170 */
171 SET_KIND_LAST_GRADECAST,
160 SET_KIND_LEADER_PROPOSAL, 172 SET_KIND_LEADER_PROPOSAL,
161 SET_KIND_ECHO_RESULT, 173 SET_KIND_ECHO_RESULT,
162}; 174};
@@ -249,6 +261,9 @@ struct Step
249 261
250 struct ConsensusSession *session; 262 struct ConsensusSession *session;
251 263
264 /**
265 * Tasks that this step is composed of.
266 */
252 struct TaskEntry **tasks; 267 struct TaskEntry **tasks;
253 unsigned int tasks_len; 268 unsigned int tasks_len;
254 unsigned int tasks_cap; 269 unsigned int tasks_cap;
@@ -293,6 +308,19 @@ struct Step
293 * the task, used for debugging. 308 * the task, used for debugging.
294 */ 309 */
295 char *debug_name; 310 char *debug_name;
311
312 /**
313 * When we're doing an early finish, how should this step be
314 * treated?
315 * If GNUNET_YES, the step will be marked as finished
316 * without actually running its tasks.
317 * Otherwise, the step will still be run even after
318 * an early finish.
319 *
320 * Note that a task may never be finished early if
321 * it is already running.
322 */
323 int early_finishable;
296}; 324};
297 325
298 326
@@ -459,6 +487,11 @@ struct ConsensusSession
459 * Uses the session's global id as app id. 487 * Uses the session's global id as app id.
460 */ 488 */
461 struct GNUNET_SET_ListenHandle *set_listener; 489 struct GNUNET_SET_ListenHandle *set_listener;
490
491 /**
492 * State of our early stopping scheme.
493 */
494 int early_stopping;
462}; 495};
463 496
464/** 497/**
@@ -1367,9 +1400,9 @@ put_set (struct ConsensusSession *session,
1367 debug_str_set_key (&set->key)); 1400 debug_str_set_key (&set->key));
1368 1401
1369 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash); 1402 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1370 GNUNET_assert (GNUNET_OK == 1403 GNUNET_assert (GNUNET_SYSERR !=
1371 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set, 1404 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1372 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 1405 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1373} 1406}
1374 1407
1375 1408
@@ -1487,6 +1520,11 @@ diff_destroy (struct DiffEntry *diff)
1487} 1520}
1488 1521
1489 1522
1523/**
1524 * For a given majority, count what the outcome
1525 * is (add/remove/keep), and give the number
1526 * of peers that voted for this outcome.
1527 */
1490static void 1528static void
1491rfn_majority (const struct ReferendumEntry *rfn, 1529rfn_majority (const struct ReferendumEntry *rfn,
1492 const struct RfnElementInfo *ri, 1530 const struct RfnElementInfo *ri,
@@ -1605,6 +1643,89 @@ set_mutation_done (void *cls)
1605 } 1643 }
1606} 1644}
1607 1645
1646
1647static void
1648try_finish_step_early (struct Step *step)
1649{
1650 unsigned int i;
1651
1652 if (GNUNET_YES == step->is_running)
1653 return;
1654 if (GNUNET_YES == step->is_finished)
1655 return;
1656 if (GNUNET_NO == step->early_finishable)
1657 return;
1658
1659 step->is_finished = GNUNET_YES;
1660
1661#ifdef GNUNET_EXTRA_LOGGING
1662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1663 "Finishing step `%s' early.\n",
1664 step->debug_name);
1665#endif
1666
1667 for (i = 0; i < step->subordinates_len; i++)
1668 {
1669 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1670 step->subordinates[i]->pending_prereq--;
1671#ifdef GNUNET_EXTRA_LOGGING
1672 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673 "Decreased pending_prereq to %u for step `%s'.\n",
1674 step->subordinates[i]->pending_prereq,
1675 step->subordinates[i]->debug_name);
1676
1677#endif
1678 try_finish_step_early (step->subordinates[i]);
1679 }
1680
1681 // XXX: maybe schedule as task to avoid recursion?
1682 run_ready_steps (step->session);
1683}
1684
1685
1686static void
1687finish_step (struct Step *step)
1688{
1689 unsigned int i;
1690
1691 GNUNET_assert (step->finished_tasks == step->tasks_len);
1692 GNUNET_assert (GNUNET_YES == step->is_running);
1693 GNUNET_assert (GNUNET_NO == step->is_finished);
1694
1695#ifdef GNUNET_EXTRA_LOGGING
1696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697 "All tasks of step `%s' with %u subordinates finished.\n",
1698 step->debug_name,
1699 step->subordinates_len);
1700#endif
1701
1702 for (i = 0; i < step->subordinates_len; i++)
1703 {
1704 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1705 step->subordinates[i]->pending_prereq--;
1706#ifdef GNUNET_EXTRA_LOGGING
1707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1708 "Decreased pending_prereq to %u for step `%s'.\n",
1709 step->subordinates[i]->pending_prereq,
1710 step->subordinates[i]->debug_name);
1711
1712#endif
1713 }
1714
1715 step->is_finished = GNUNET_YES;
1716
1717 // XXX: maybe schedule as task to avoid recursion?
1718 run_ready_steps (step->session);
1719}
1720
1721
1722
1723/**
1724 * Apply the result from one round of gradecasts (i.e. every peer
1725 * should have gradecasted) to the peer's current set.
1726 *
1727 * @param task the task with context information
1728 */
1608static void 1729static void
1609task_start_apply_round (struct TaskEntry *task) 1730task_start_apply_round (struct TaskEntry *task)
1610{ 1731{
@@ -1617,6 +1738,7 @@ task_start_apply_round (struct TaskEntry *task)
1617 struct GNUNET_CONTAINER_MultiHashMapIterator *iter; 1738 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1618 struct RfnElementInfo *ri; 1739 struct RfnElementInfo *ri;
1619 struct SetMutationProgressCls *progress_cls; 1740 struct SetMutationProgressCls *progress_cls;
1741 uint16_t worst_majority = UINT16_MAX;
1620 1742
1621 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition }; 1743 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1622 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; 1744 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
@@ -1644,6 +1766,9 @@ task_start_apply_round (struct TaskEntry *task)
1644 1766
1645 rfn_majority (rfn_in, ri, &majority_num, &majority_vote); 1767 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1646 1768
1769 if (worst_majority > majority_num)
1770 worst_majority = majority_num;
1771
1647 switch (majority_vote) 1772 switch (majority_vote)
1648 { 1773 {
1649 case VOTE_ADD: 1774 case VOTE_ADD:
@@ -1653,6 +1778,10 @@ task_start_apply_round (struct TaskEntry *task)
1653 ri->element, 1778 ri->element,
1654 set_mutation_done, 1779 set_mutation_done,
1655 progress_cls)); 1780 progress_cls));
1781 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1782 "P%u: apply round: adding element %s with %u-majority.\n",
1783 session->local_peer_idx,
1784 debug_str_element (ri->element), majority_num);
1656 break; 1785 break;
1657 case VOTE_REMOVE: 1786 case VOTE_REMOVE:
1658 progress_cls->num_pending++; 1787 progress_cls->num_pending++;
@@ -1661,8 +1790,16 @@ task_start_apply_round (struct TaskEntry *task)
1661 ri->element, 1790 ri->element,
1662 set_mutation_done, 1791 set_mutation_done,
1663 progress_cls)); 1792 progress_cls));
1793 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1794 "P%u: apply round: deleting element %s with %u-majority.\n",
1795 session->local_peer_idx,
1796 debug_str_element (ri->element), majority_num);
1664 break; 1797 break;
1665 case VOTE_STAY: 1798 case VOTE_STAY:
1799 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1800 "P%u: apply round: keeping element %s with %u-majority.\n",
1801 session->local_peer_idx,
1802 debug_str_element (ri->element), majority_num);
1666 // do nothing 1803 // do nothing
1667 break; 1804 break;
1668 default: 1805 default:
@@ -1677,10 +1814,53 @@ task_start_apply_round (struct TaskEntry *task)
1677 GNUNET_free (progress_cls); 1814 GNUNET_free (progress_cls);
1678 finish_task (task); 1815 finish_task (task);
1679 } 1816 }
1680}
1681 1817
1818 {
1819 uint16_t thresh = (session->num_peers / 3) * 2;
1682 1820
1683#define THRESH(s) (((s)->num_peers / 3)) 1821 if (worst_majority >= thresh)
1822 {
1823 switch (session->early_stopping)
1824 {
1825 case EARLY_STOPPING_NONE:
1826 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1827 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1828 "P%u: Stopping early (after one more superround)\n",
1829 session->local_peer_idx);
1830 break;
1831 case EARLY_STOPPING_ONE_MORE:
1832 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1833 session->local_peer_idx);
1834 session->early_stopping = EARLY_STOPPING_DONE;
1835 {
1836 struct Step *step;
1837 for (step = session->steps_head; NULL != step; step = step->next)
1838 try_finish_step_early (step);
1839 }
1840 break;
1841 case EARLY_STOPPING_DONE:
1842 /* We shouldn't be here anymore after early stopping */
1843 GNUNET_break (0);
1844 break;
1845 default:
1846 GNUNET_assert (0);
1847 break;
1848 }
1849 }
1850 else if (EARLY_STOPPING_NONE != session->early_stopping)
1851 {
1852 // Our assumption about the number of bad peers
1853 // has been broken.
1854 GNUNET_break_op (0);
1855 }
1856 else
1857 {
1858 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1859 session->local_peer_idx);
1860 }
1861 }
1862
1863}
1684 1864
1685 1865
1686static void 1866static void
@@ -1904,6 +2084,16 @@ task_start_eval_echo (struct TaskEntry *task)
1904 return; 2084 return;
1905 } 2085 }
1906 2086
2087
2088 {
2089 // FIXME: should be marked as a shallow copy, so
2090 // we can destroy everything correctly
2091 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2092 last_set->h = output_set->h;
2093 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2094 put_set (session, last_set);
2095 }
2096
1907 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1908 "Evaluating referendum in Task {%s}\n", 2098 "Evaluating referendum in Task {%s}\n",
1909 debug_str_task_key (&task->key)); 2099 debug_str_task_key (&task->key));
@@ -2009,39 +2199,6 @@ start_task (struct ConsensusSession *session, struct TaskEntry *task)
2009} 2199}
2010 2200
2011 2201
2012static void finish_step (struct Step *step)
2013{
2014 unsigned int i;
2015
2016 GNUNET_assert (step->finished_tasks == step->tasks_len);
2017 GNUNET_assert (GNUNET_YES == step->is_running);
2018 GNUNET_assert (GNUNET_NO == step->is_finished);
2019
2020#ifdef GNUNET_EXTRA_LOGGING
2021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2022 "All tasks of step `%s' with %u subordinates finished.\n",
2023 step->debug_name,
2024 step->subordinates_len);
2025#endif
2026
2027 for (i = 0; i < step->subordinates_len; i++)
2028 {
2029 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
2030 step->subordinates[i]->pending_prereq--;
2031#ifdef GNUNET_EXTRA_LOGGING
2032 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2033 "Decreased pending_prereq to %u for step `%s'.\n",
2034 step->subordinates[i]->pending_prereq,
2035 step->subordinates[i]->debug_name);
2036
2037#endif
2038 }
2039
2040 step->is_finished = GNUNET_YES;
2041
2042 // XXX: maybe schedule as task to avoid recursion?
2043 run_ready_steps (step->session);
2044}
2045 2202
2046 2203
2047/* 2204/*
@@ -2057,7 +2214,7 @@ run_ready_steps (struct ConsensusSession *session)
2057 2214
2058 while (NULL != step) 2215 while (NULL != step)
2059 { 2216 {
2060 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) ) 2217 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2061 { 2218 {
2062 size_t i; 2219 size_t i;
2063 2220
@@ -2450,12 +2607,13 @@ step_depend_on (struct Step *step, struct Step *dep)
2450 2607
2451 2608
2452static struct Step * 2609static struct Step *
2453create_step (struct ConsensusSession *session, int round) 2610create_step (struct ConsensusSession *session, int round, int early_finishable)
2454{ 2611{
2455 struct Step *step; 2612 struct Step *step;
2456 step = GNUNET_new (struct Step); 2613 step = GNUNET_new (struct Step);
2457 step->session = session; 2614 step->session = session;
2458 step->round = round; 2615 step->round = round;
2616 step->early_finishable = early_finishable;
2459 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head, 2617 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2460 session->steps_tail, 2618 session->steps_tail,
2461 step); 2619 step);
@@ -2494,7 +2652,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2494 2652
2495 /* gcast step 1: leader disseminates */ 2653 /* gcast step 1: leader disseminates */
2496 2654
2497 step = create_step (session, round); 2655 step = create_step (session, round, GNUNET_YES);
2498 2656
2499#ifdef GNUNET_EXTRA_LOGGING 2657#ifdef GNUNET_EXTRA_LOGGING
2500 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep); 2658 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
@@ -2554,7 +2712,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2554 /* gcast phase 2: echo */ 2712 /* gcast phase 2: echo */
2555 prev_step = step; 2713 prev_step = step;
2556 round += 1; 2714 round += 1;
2557 step = create_step (session, round); 2715 step = create_step (session, round, GNUNET_YES);
2558#ifdef GNUNET_EXTRA_LOGGING 2716#ifdef GNUNET_EXTRA_LOGGING
2559 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep); 2717 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2560#endif 2718#endif
@@ -2578,7 +2736,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2578 2736
2579 prev_step = step; 2737 prev_step = step;
2580 /* Same round, since step only has local tasks */ 2738 /* Same round, since step only has local tasks */
2581 step = create_step (session, round); 2739 step = create_step (session, round, GNUNET_YES);
2582#ifdef GNUNET_EXTRA_LOGGING 2740#ifdef GNUNET_EXTRA_LOGGING
2583 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep); 2741 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2584#endif 2742#endif
@@ -2594,7 +2752,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2594 2752
2595 prev_step = step; 2753 prev_step = step;
2596 round += 1; 2754 round += 1;
2597 step = create_step (session, round); 2755 step = create_step (session, round, GNUNET_YES);
2598#ifdef GNUNET_EXTRA_LOGGING 2756#ifdef GNUNET_EXTRA_LOGGING
2599 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep); 2757 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2600#endif 2758#endif
@@ -2624,7 +2782,7 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2624 2782
2625 prev_step = step; 2783 prev_step = step;
2626 /* Same round, since step only has local tasks */ 2784 /* Same round, since step only has local tasks */
2627 step = create_step (session, round); 2785 step = create_step (session, round, GNUNET_YES);
2628#ifdef GNUNET_EXTRA_LOGGING 2786#ifdef GNUNET_EXTRA_LOGGING
2629 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep); 2787 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2630#endif 2788#endif
@@ -2675,7 +2833,7 @@ construct_task_graph (struct ConsensusSession *session)
2675 2833
2676 /* all-to-all step */ 2834 /* all-to-all step */
2677 2835
2678 step = create_step (session, round); 2836 step = create_step (session, round, GNUNET_NO);
2679 2837
2680#ifdef GNUNET_EXTRA_LOGGING 2838#ifdef GNUNET_EXTRA_LOGGING
2681 step->debug_name = GNUNET_strdup ("all to all"); 2839 step->debug_name = GNUNET_strdup ("all to all");
@@ -2712,7 +2870,7 @@ construct_task_graph (struct ConsensusSession *session)
2712 struct Step *step_rep_end; 2870 struct Step *step_rep_end;
2713 2871
2714 /* Every repetition is in a separate round. */ 2872 /* Every repetition is in a separate round. */
2715 step_rep_start = create_step (session, round); 2873 step_rep_start = create_step (session, round, GNUNET_YES);
2716#ifdef GNUNET_EXTRA_LOGGING 2874#ifdef GNUNET_EXTRA_LOGGING
2717 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i); 2875 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2718#endif 2876#endif
@@ -2721,7 +2879,7 @@ construct_task_graph (struct ConsensusSession *session)
2721 2879
2722 /* gradecast has three rounds */ 2880 /* gradecast has three rounds */
2723 round += 3; 2881 round += 3;
2724 step_rep_end = create_step (session, round); 2882 step_rep_end = create_step (session, round, GNUNET_YES);
2725#ifdef GNUNET_EXTRA_LOGGING 2883#ifdef GNUNET_EXTRA_LOGGING
2726 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i); 2884 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2727#endif 2885#endif
@@ -2743,7 +2901,7 @@ construct_task_graph (struct ConsensusSession *session)
2743 /* There is no next gradecast round, thus the final 2901 /* There is no next gradecast round, thus the final
2744 start step is the overall end step of the gradecasts */ 2902 start step is the overall end step of the gradecasts */
2745 round += 1; 2903 round += 1;
2746 step = create_step (session, round); 2904 step = create_step (session, round, GNUNET_NO);
2747#ifdef GNUNET_EXTRA_LOGGING 2905#ifdef GNUNET_EXTRA_LOGGING
2748 GNUNET_asprintf (&step->debug_name, "finish"); 2906 GNUNET_asprintf (&step->debug_name, "finish");
2749#endif 2907#endif
@@ -2754,7 +2912,7 @@ construct_task_graph (struct ConsensusSession *session)
2754 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 }, 2912 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2755 .start = task_start_finish, 2913 .start = task_start_finish,
2756 }); 2914 });
2757 task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 }; 2915 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2758 2916
2759 put_task (session->taskmap, &task); 2917 put_task (session->taskmap, &task);
2760} 2918}