diff options
author | Florian Dold <florian.dold@gmail.com> | 2015-10-06 15:22:02 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2015-10-06 15:22:02 +0000 |
commit | c81d7e1199cf2779973900677ffb3d12e12ca96f (patch) | |
tree | 2065078e38d3506f7fbf2d51f3b3125d2f744f36 /src/consensus/gnunet-service-consensus.c | |
parent | 4f47b28bace8908afbd9d2b5d4093005931d298d (diff) | |
download | gnunet-c81d7e1199cf2779973900677ffb3d12e12ca96f.tar.gz gnunet-c81d7e1199cf2779973900677ffb3d12e12ca96f.zip |
towards handling byz. faults correctly
- blacklisting of peers in consensus
- restructuring
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 278 |
1 files changed, 179 insertions, 99 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index dd619c6e4..2d2725b64 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -34,9 +34,34 @@ | |||
34 | #include "consensus.h" | 34 | #include "consensus.h" |
35 | 35 | ||
36 | 36 | ||
37 | #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1) | ||
38 | |||
39 | |||
40 | enum ReferendumVote | ||
41 | { | ||
42 | /** | ||
43 | * Vote that nothing should change. | ||
44 | * This option is never voted explicitly. | ||
45 | */ | ||
46 | VOTE_STAY = 0, | ||
47 | /** | ||
48 | * Vote that an element should be added. | ||
49 | */ | ||
50 | VOTE_ADD = 1, | ||
51 | /** | ||
52 | * Vote that an element should be removed. | ||
53 | */ | ||
54 | VOTE_REMOVE = 2, | ||
55 | }; | ||
56 | |||
37 | 57 | ||
38 | GNUNET_NETWORK_STRUCT_BEGIN | 58 | GNUNET_NETWORK_STRUCT_BEGIN |
39 | 59 | ||
60 | |||
61 | struct ContestedPayload | ||
62 | { | ||
63 | }; | ||
64 | |||
40 | /** | 65 | /** |
41 | * Tuple of integers that together | 66 | * Tuple of integers that together |
42 | * identify a task uniquely. | 67 | * identify a task uniquely. |
@@ -72,23 +97,6 @@ struct TaskKey { | |||
72 | }; | 97 | }; |
73 | 98 | ||
74 | 99 | ||
75 | enum ReferendumVote | ||
76 | { | ||
77 | /** | ||
78 | * Vote that nothing should change. | ||
79 | * This option is never voted explicitly. | ||
80 | */ | ||
81 | VOTE_STAY = 0, | ||
82 | /** | ||
83 | * Vote that an element should be added. | ||
84 | */ | ||
85 | VOTE_ADD = 1, | ||
86 | /** | ||
87 | * Vote that an element should be removed. | ||
88 | */ | ||
89 | VOTE_REMOVE = 2, | ||
90 | }; | ||
91 | |||
92 | 100 | ||
93 | struct SetKey | 101 | struct SetKey |
94 | { | 102 | { |
@@ -136,7 +144,6 @@ enum PhaseKind | |||
136 | PHASE_KIND_GRADECAST_ECHO_GRADE, | 144 | PHASE_KIND_GRADECAST_ECHO_GRADE, |
137 | PHASE_KIND_GRADECAST_CONFIRM, | 145 | PHASE_KIND_GRADECAST_CONFIRM, |
138 | PHASE_KIND_GRADECAST_CONFIRM_GRADE, | 146 | PHASE_KIND_GRADECAST_CONFIRM_GRADE, |
139 | PHASE_KIND_GRADECAST_APPLY_RESULT, | ||
140 | /** | 147 | /** |
141 | * Apply a repetition of the all-to-all | 148 | * Apply a repetition of the all-to-all |
142 | * gradecast to the current set. | 149 | * gradecast to the current set. |
@@ -484,9 +491,6 @@ static void | |||
484 | finish_task (struct TaskEntry *task); | 491 | finish_task (struct TaskEntry *task); |
485 | 492 | ||
486 | static void | 493 | static void |
487 | task_start_reconcile (struct TaskEntry *task); | ||
488 | |||
489 | static void | ||
490 | run_ready_steps (struct ConsensusSession *session); | 494 | run_ready_steps (struct ConsensusSession *session); |
491 | 495 | ||
492 | static const char * | 496 | static const char * |
@@ -501,7 +505,6 @@ phasename (uint16_t phase) | |||
501 | case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE"; | 505 | case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE"; |
502 | case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM"; | 506 | case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM"; |
503 | case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE"; | 507 | case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE"; |
504 | case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT"; | ||
505 | case PHASE_KIND_APPLY_REP: return "APPLY_REP"; | 508 | case PHASE_KIND_APPLY_REP: return "APPLY_REP"; |
506 | default: return "(unknown)"; | 509 | default: return "(unknown)"; |
507 | } | 510 | } |
@@ -772,24 +775,39 @@ diff_insert (struct DiffEntry *diff, | |||
772 | 775 | ||
773 | 776 | ||
774 | static void | 777 | static void |
778 | rfn_commit (struct ReferendumEntry *rfn, | ||
779 | uint16_t commit_peer) | ||
780 | { | ||
781 | GNUNET_assert (commit_peer < rfn->num_peers); | ||
782 | |||
783 | rfn->peer_commited[commit_peer] = GNUNET_YES; | ||
784 | } | ||
785 | |||
786 | |||
787 | static void | ||
788 | rfn_contest (struct ReferendumEntry *rfn, | ||
789 | uint16_t contested_peer) | ||
790 | { | ||
791 | GNUNET_assert (contested_peer < rfn->num_peers); | ||
792 | |||
793 | rfn->peer_contested[contested_peer] = GNUNET_YES; | ||
794 | } | ||
795 | |||
796 | static void | ||
775 | rfn_vote (struct ReferendumEntry *rfn, | 797 | rfn_vote (struct ReferendumEntry *rfn, |
776 | uint16_t voting_peer, | 798 | uint16_t voting_peer, |
777 | uint16_t num_peers, | ||
778 | enum ReferendumVote vote, | 799 | enum ReferendumVote vote, |
779 | const struct GNUNET_SET_Element *element) | 800 | const struct GNUNET_SET_Element *element) |
780 | { | 801 | { |
781 | struct RfnElementInfo *ri; | 802 | struct RfnElementInfo *ri; |
782 | struct GNUNET_HashCode hash; | 803 | struct GNUNET_HashCode hash; |
783 | 804 | ||
784 | GNUNET_assert (voting_peer < num_peers); | 805 | GNUNET_assert (voting_peer < rfn->num_peers); |
785 | 806 | ||
786 | /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE, | 807 | /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE, |
787 | since VOTE_KEEP is implicit in not voting. */ | 808 | since VOTE_KEEP is implicit in not voting. */ |
788 | GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) ); | 809 | GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) ); |
789 | 810 | ||
790 | // XXX: should happen in another place! | ||
791 | rfn->peer_commited[voting_peer] = GNUNET_YES; | ||
792 | |||
793 | GNUNET_SET_element_hash (element, &hash); | 811 | GNUNET_SET_element_hash (element, &hash); |
794 | ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash); | 812 | ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash); |
795 | 813 | ||
@@ -797,7 +815,7 @@ rfn_vote (struct ReferendumEntry *rfn, | |||
797 | { | 815 | { |
798 | ri = GNUNET_new (struct RfnElementInfo); | 816 | ri = GNUNET_new (struct RfnElementInfo); |
799 | ri->element = GNUNET_SET_element_dup (element); | 817 | ri->element = GNUNET_SET_element_dup (element); |
800 | ri->votes = GNUNET_new_array (num_peers, int); | 818 | ri->votes = GNUNET_new_array (rfn->num_peers, int); |
801 | GNUNET_assert (GNUNET_OK == | 819 | GNUNET_assert (GNUNET_OK == |
802 | GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements, | 820 | GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements, |
803 | &hash, ri, | 821 | &hash, ri, |
@@ -897,6 +915,16 @@ set_result_cb (void *cls, | |||
897 | return; | 915 | return; |
898 | } | 916 | } |
899 | 917 | ||
918 | if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) ) | ||
919 | { | ||
920 | if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) ) | ||
921 | { | ||
922 | GNUNET_assert (NULL != output_rfn); | ||
923 | rfn_contest (output_rfn, task_other_peer (task)); | ||
924 | return; | ||
925 | } | ||
926 | } | ||
927 | |||
900 | switch (status) | 928 | switch (status) |
901 | { | 929 | { |
902 | case GNUNET_SET_STATUS_ADD_LOCAL: | 930 | case GNUNET_SET_STATUS_ADD_LOCAL: |
@@ -933,7 +961,7 @@ set_result_cb (void *cls, | |||
933 | } | 961 | } |
934 | if (NULL != output_rfn) | 962 | if (NULL != output_rfn) |
935 | { | 963 | { |
936 | rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element); | 964 | rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element); |
937 | #ifdef GNUNET_EXTRA_LOGGING | 965 | #ifdef GNUNET_EXTRA_LOGGING |
938 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 966 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
939 | "P%u: adding element %s into rfn {%s} of task {%s}\n", | 967 | "P%u: adding element %s into rfn {%s} of task {%s}\n", |
@@ -948,6 +976,8 @@ set_result_cb (void *cls, | |||
948 | case GNUNET_SET_STATUS_ADD_REMOTE: | 976 | case GNUNET_SET_STATUS_ADD_REMOTE: |
949 | if (GNUNET_YES == setop->do_not_remove) | 977 | if (GNUNET_YES == setop->do_not_remove) |
950 | break; | 978 | break; |
979 | if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) | ||
980 | break; | ||
951 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 981 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
952 | "Removing element in Task {%s}\n", | 982 | "Removing element in Task {%s}\n", |
953 | debug_str_task_key (&task->key)); | 983 | debug_str_task_key (&task->key)); |
@@ -981,7 +1011,7 @@ set_result_cb (void *cls, | |||
981 | } | 1011 | } |
982 | if (NULL != output_rfn) | 1012 | if (NULL != output_rfn) |
983 | { | 1013 | { |
984 | rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_REMOVE, element); | 1014 | rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element); |
985 | #ifdef GNUNET_EXTRA_LOGGING | 1015 | #ifdef GNUNET_EXTRA_LOGGING |
986 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1016 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
987 | "P%u: removing element %s from rfn {%s} of task {%s}\n", | 1017 | "P%u: removing element %s from rfn {%s} of task {%s}\n", |
@@ -995,10 +1025,13 @@ set_result_cb (void *cls, | |||
995 | case GNUNET_SET_STATUS_DONE: | 1025 | case GNUNET_SET_STATUS_DONE: |
996 | // XXX: check first if any changes to the underlying | 1026 | // XXX: check first if any changes to the underlying |
997 | // set are still pending | 1027 | // set are still pending |
998 | // XXX: commit other peer in referendum | ||
999 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1028 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1000 | "Finishing setop in Task {%s}\n", | 1029 | "Finishing setop in Task {%s}\n", |
1001 | debug_str_task_key (&task->key)); | 1030 | debug_str_task_key (&task->key)); |
1031 | if (NULL != output_rfn) | ||
1032 | { | ||
1033 | rfn_commit (output_rfn, task_other_peer (task)); | ||
1034 | } | ||
1002 | finish_task (task); | 1035 | finish_task (task); |
1003 | break; | 1036 | break; |
1004 | case GNUNET_SET_STATUS_FAILURE: | 1037 | case GNUNET_SET_STATUS_FAILURE: |
@@ -1159,6 +1192,15 @@ commit_set (struct ConsensusSession *session, | |||
1159 | } | 1192 | } |
1160 | } | 1193 | } |
1161 | #else | 1194 | #else |
1195 | if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) ) | ||
1196 | { | ||
1197 | struct GNUNET_SET_Element element; | ||
1198 | struct ContestedPayload payload; | ||
1199 | element.data = &payload; | ||
1200 | element.size = sizeof (struct ContestedPayload); | ||
1201 | element.element_type = ELEMENT_TYPE_CONTESTED_MARKER; | ||
1202 | GNUNET_SET_add_element (set->h, &element, NULL, NULL); | ||
1203 | } | ||
1162 | GNUNET_SET_commit (setop->op, set->h); | 1204 | GNUNET_SET_commit (setop->op, set->h); |
1163 | #endif | 1205 | #endif |
1164 | } | 1206 | } |
@@ -1212,25 +1254,6 @@ put_rfn (struct ConsensusSession *session, | |||
1212 | 1254 | ||
1213 | 1255 | ||
1214 | static void | 1256 | static void |
1215 | output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy) | ||
1216 | { | ||
1217 | struct TaskEntry *task = (struct TaskEntry *) cls; | ||
1218 | struct SetOpCls *setop = &task->cls.setop; | ||
1219 | struct ConsensusSession *session = task->step->session; | ||
1220 | struct SetEntry *set = GNUNET_new (struct SetEntry); | ||
1221 | |||
1222 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1223 | "P%u: Received lazy copy, storing output set %s\n", | ||
1224 | session->local_peer_idx, debug_str_set_key (&setop->output_set)); | ||
1225 | |||
1226 | set->key = setop->output_set; | ||
1227 | set->h = copy; | ||
1228 | put_set (task->step->session, set); | ||
1229 | task_start_reconcile (task); | ||
1230 | } | ||
1231 | |||
1232 | |||
1233 | static void | ||
1234 | task_cancel_reconcile (struct TaskEntry *task) | 1257 | task_cancel_reconcile (struct TaskEntry *task) |
1235 | { | 1258 | { |
1236 | /* not implemented yet */ | 1259 | /* not implemented yet */ |
@@ -1249,15 +1272,18 @@ apply_diff_to_rfn (struct DiffEntry *diff, | |||
1249 | 1272 | ||
1250 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes); | 1273 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes); |
1251 | 1274 | ||
1252 | while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di)) | 1275 | while (GNUNET_YES == |
1276 | GNUNET_CONTAINER_multihashmap_iterator_next (iter, | ||
1277 | NULL, | ||
1278 | (const void **) &di)) | ||
1253 | { | 1279 | { |
1254 | if (di->weight > 0) | 1280 | if (di->weight > 0) |
1255 | { | 1281 | { |
1256 | rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element); | 1282 | rfn_vote (rfn, voting_peer, VOTE_ADD, di->element); |
1257 | } | 1283 | } |
1258 | if (di->weight < 0) | 1284 | if (di->weight < 0) |
1259 | { | 1285 | { |
1260 | rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element); | 1286 | rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element); |
1261 | } | 1287 | } |
1262 | } | 1288 | } |
1263 | 1289 | ||
@@ -1401,6 +1427,12 @@ create_set_copy_for_task (struct TaskEntry *task, | |||
1401 | struct SetEntry *src_set; | 1427 | struct SetEntry *src_set; |
1402 | struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls); | 1428 | struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls); |
1403 | 1429 | ||
1430 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1431 | "Copying set {%s} to {%s} for task {%s}\n", | ||
1432 | debug_str_set_key (src_set_key), | ||
1433 | debug_str_set_key (dst_set_key), | ||
1434 | debug_str_task_key (&task->key)); | ||
1435 | |||
1404 | scc->task = task; | 1436 | scc->task = task; |
1405 | scc->dst_set_key = *dst_set_key; | 1437 | scc->dst_set_key = *dst_set_key; |
1406 | src_set = lookup_set (task->step->session, src_set_key); | 1438 | src_set = lookup_set (task->step->session, src_set_key); |
@@ -1410,6 +1442,34 @@ create_set_copy_for_task (struct TaskEntry *task, | |||
1410 | scc); | 1442 | scc); |
1411 | } | 1443 | } |
1412 | 1444 | ||
1445 | |||
1446 | struct SetMutationProgressCls | ||
1447 | { | ||
1448 | int num_pending; | ||
1449 | /** | ||
1450 | * Task to finish once all changes are through. | ||
1451 | */ | ||
1452 | struct TaskEntry *task; | ||
1453 | }; | ||
1454 | |||
1455 | |||
1456 | static void | ||
1457 | set_mutation_done (void *cls) | ||
1458 | { | ||
1459 | struct SetMutationProgressCls *pc = cls; | ||
1460 | |||
1461 | GNUNET_assert (pc->num_pending > 0); | ||
1462 | |||
1463 | pc->num_pending--; | ||
1464 | |||
1465 | if (0 == pc->num_pending) | ||
1466 | { | ||
1467 | struct TaskEntry *task = pc->task; | ||
1468 | GNUNET_free (pc); | ||
1469 | finish_task (task); | ||
1470 | } | ||
1471 | } | ||
1472 | |||
1413 | static void | 1473 | static void |
1414 | task_start_apply_round (struct TaskEntry *task) | 1474 | task_start_apply_round (struct TaskEntry *task) |
1415 | { | 1475 | { |
@@ -1421,6 +1481,7 @@ task_start_apply_round (struct TaskEntry *task) | |||
1421 | struct ReferendumEntry *rfn_in; | 1481 | struct ReferendumEntry *rfn_in; |
1422 | struct GNUNET_CONTAINER_MultiHashMapIterator *iter; | 1482 | struct GNUNET_CONTAINER_MultiHashMapIterator *iter; |
1423 | struct RfnElementInfo *ri; | 1483 | struct RfnElementInfo *ri; |
1484 | struct SetMutationProgressCls *progress_cls; | ||
1424 | 1485 | ||
1425 | sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition }; | 1486 | sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition }; |
1426 | rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; | 1487 | rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; |
@@ -1436,6 +1497,9 @@ task_start_apply_round (struct TaskEntry *task) | |||
1436 | rfn_in = lookup_rfn (session, &rk_in); | 1497 | rfn_in = lookup_rfn (session, &rk_in); |
1437 | GNUNET_assert (NULL != rfn_in); | 1498 | GNUNET_assert (NULL != rfn_in); |
1438 | 1499 | ||
1500 | progress_cls = GNUNET_new (struct SetMutationProgressCls); | ||
1501 | progress_cls->task = task; | ||
1502 | |||
1439 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements); | 1503 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements); |
1440 | 1504 | ||
1441 | while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) | 1505 | while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) |
@@ -1448,10 +1512,20 @@ task_start_apply_round (struct TaskEntry *task) | |||
1448 | switch (majority_vote) | 1512 | switch (majority_vote) |
1449 | { | 1513 | { |
1450 | case VOTE_ADD: | 1514 | case VOTE_ADD: |
1451 | // XXX: add to set | 1515 | progress_cls->num_pending++; |
1516 | GNUNET_assert (GNUNET_OK == | ||
1517 | GNUNET_SET_add_element (set_out->h, | ||
1518 | ri->element, | ||
1519 | set_mutation_done, | ||
1520 | progress_cls)); | ||
1452 | break; | 1521 | break; |
1453 | case VOTE_REMOVE: | 1522 | case VOTE_REMOVE: |
1454 | // XXX: remove from set | 1523 | progress_cls->num_pending++; |
1524 | GNUNET_assert (GNUNET_OK == | ||
1525 | GNUNET_SET_remove_element (set_out->h, | ||
1526 | ri->element, | ||
1527 | set_mutation_done, | ||
1528 | progress_cls)); | ||
1455 | break; | 1529 | break; |
1456 | case VOTE_STAY: | 1530 | case VOTE_STAY: |
1457 | // do nothing | 1531 | // do nothing |
@@ -1461,9 +1535,19 @@ task_start_apply_round (struct TaskEntry *task) | |||
1461 | break; | 1535 | break; |
1462 | } | 1536 | } |
1463 | } | 1537 | } |
1538 | |||
1539 | if (progress_cls->num_pending == 0) | ||
1540 | { | ||
1541 | // call closure right now, no pending ops | ||
1542 | GNUNET_free (progress_cls); | ||
1543 | finish_task (task); | ||
1544 | } | ||
1464 | } | 1545 | } |
1465 | 1546 | ||
1466 | 1547 | ||
1548 | #define THRESH(s) (((s)->num_peers / 3)) | ||
1549 | |||
1550 | |||
1467 | static void | 1551 | static void |
1468 | task_start_grade (struct TaskEntry *task) | 1552 | task_start_grade (struct TaskEntry *task) |
1469 | { | 1553 | { |
@@ -1475,12 +1559,14 @@ task_start_grade (struct TaskEntry *task) | |||
1475 | struct DiffKey diff_key; | 1559 | struct DiffKey diff_key; |
1476 | struct GNUNET_CONTAINER_MultiHashMapIterator *iter; | 1560 | struct GNUNET_CONTAINER_MultiHashMapIterator *iter; |
1477 | struct RfnElementInfo *ri; | 1561 | struct RfnElementInfo *ri; |
1562 | unsigned int gradecast_confidence = 2; | ||
1478 | 1563 | ||
1479 | rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; | 1564 | rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; |
1480 | output_rfn = lookup_rfn (session, &rfn_key); | 1565 | output_rfn = lookup_rfn (session, &rfn_key); |
1481 | if (NULL == output_rfn) | 1566 | if (NULL == output_rfn) |
1482 | { | 1567 | { |
1483 | output_rfn = rfn_create (session->num_peers); | 1568 | output_rfn = rfn_create (session->num_peers); |
1569 | output_rfn->key = rfn_key; | ||
1484 | put_rfn (session, output_rfn); | 1570 | put_rfn (session, output_rfn); |
1485 | } | 1571 | } |
1486 | 1572 | ||
@@ -1492,26 +1578,50 @@ task_start_grade (struct TaskEntry *task) | |||
1492 | input_rfn = lookup_rfn (session, &rfn_key); | 1578 | input_rfn = lookup_rfn (session, &rfn_key); |
1493 | GNUNET_assert (NULL != input_rfn); | 1579 | GNUNET_assert (NULL != input_rfn); |
1494 | 1580 | ||
1495 | apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers); | ||
1496 | |||
1497 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); | 1581 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); |
1498 | 1582 | ||
1583 | apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers); | ||
1584 | |||
1499 | while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) | 1585 | while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) |
1500 | { | 1586 | { |
1501 | uint16_t majority_num; | 1587 | uint16_t majority_num; |
1502 | enum ReferendumVote majority_vote; | 1588 | enum ReferendumVote majority_vote; |
1503 | 1589 | ||
1590 | // XXX: we need contested votes and non-contested votes here | ||
1504 | rfn_majority (input_rfn, ri, &majority_num, &majority_vote); | 1591 | rfn_majority (input_rfn, ri, &majority_num, &majority_vote); |
1505 | 1592 | ||
1506 | 1593 | if (majority_num < (session->num_peers / 3) * 2) | |
1594 | { | ||
1595 | gradecast_confidence = GNUNET_MIN(1, gradecast_confidence); | ||
1596 | } | ||
1597 | if (majority_num < (session->num_peers / 3) + 1) | ||
1598 | { | ||
1599 | gradecast_confidence = 0; | ||
1600 | } | ||
1507 | 1601 | ||
1508 | switch (majority_vote) | 1602 | switch (majority_vote) |
1509 | { | 1603 | { |
1604 | case VOTE_STAY: | ||
1605 | break; | ||
1606 | case VOTE_ADD: | ||
1607 | rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element); | ||
1608 | break; | ||
1609 | case VOTE_REMOVE: | ||
1610 | rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element); | ||
1611 | break; | ||
1510 | default: | 1612 | default: |
1511 | GNUNET_assert (0); | 1613 | GNUNET_assert (0); |
1512 | break; | 1614 | break; |
1513 | } | 1615 | } |
1514 | } | 1616 | } |
1617 | |||
1618 | if (gradecast_confidence >= 1) | ||
1619 | rfn_commit (output_rfn, task->key.leader); | ||
1620 | |||
1621 | if (gradecast_confidence <= 1) | ||
1622 | session->peers_blacklisted[task->key.leader] = GNUNET_YES; | ||
1623 | |||
1624 | finish_task (task); | ||
1515 | } | 1625 | } |
1516 | 1626 | ||
1517 | 1627 | ||
@@ -1537,12 +1647,7 @@ task_start_reconcile (struct TaskEntry *task) | |||
1537 | we clone the input set. */ | 1647 | we clone the input set. */ |
1538 | if (NULL == lookup_set (session, &setop->output_set)) | 1648 | if (NULL == lookup_set (session, &setop->output_set)) |
1539 | { | 1649 | { |
1540 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1650 | create_set_copy_for_task (task, &setop->input_set, &setop->output_set); |
1541 | "Output set missing, copying from input set\n"); | ||
1542 | /* Since the cloning is asynchronous, | ||
1543 | we'll retry the current function once the copy | ||
1544 | has been provided by the SET service. */ | ||
1545 | GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task); | ||
1546 | return; | 1651 | return; |
1547 | } | 1652 | } |
1548 | } | 1653 | } |
@@ -1640,37 +1745,6 @@ task_start_reconcile (struct TaskEntry *task) | |||
1640 | } | 1745 | } |
1641 | 1746 | ||
1642 | 1747 | ||
1643 | |||
1644 | |||
1645 | struct SetMutationProgressCls | ||
1646 | { | ||
1647 | int num_pending; | ||
1648 | /** | ||
1649 | * Task to finish once all changes are through. | ||
1650 | */ | ||
1651 | struct TaskEntry *task; | ||
1652 | }; | ||
1653 | |||
1654 | |||
1655 | static void | ||
1656 | set_mutation_done (void *cls) | ||
1657 | { | ||
1658 | struct SetMutationProgressCls *pc = cls; | ||
1659 | |||
1660 | GNUNET_assert (pc->num_pending > 0); | ||
1661 | |||
1662 | pc->num_pending--; | ||
1663 | |||
1664 | if (0 == pc->num_pending) | ||
1665 | { | ||
1666 | struct TaskEntry *task = pc->task; | ||
1667 | GNUNET_free (pc); | ||
1668 | finish_task (task); | ||
1669 | } | ||
1670 | } | ||
1671 | |||
1672 | |||
1673 | |||
1674 | static void | 1748 | static void |
1675 | task_start_eval_echo (struct TaskEntry *task) | 1749 | task_start_eval_echo (struct TaskEntry *task) |
1676 | { | 1750 | { |
@@ -1708,7 +1782,10 @@ task_start_eval_echo (struct TaskEntry *task) | |||
1708 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); | 1782 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); |
1709 | GNUNET_assert (NULL != iter); | 1783 | GNUNET_assert (NULL != iter); |
1710 | 1784 | ||
1711 | while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) | 1785 | while (GNUNET_YES == |
1786 | GNUNET_CONTAINER_multihashmap_iterator_next (iter, | ||
1787 | NULL, | ||
1788 | (const void **) &ri)) | ||
1712 | { | 1789 | { |
1713 | enum ReferendumVote majority_vote; | 1790 | enum ReferendumVote majority_vote; |
1714 | uint16_t majority_num; | 1791 | uint16_t majority_num; |
@@ -1717,7 +1794,12 @@ task_start_eval_echo (struct TaskEntry *task) | |||
1717 | 1794 | ||
1718 | if (majority_num < session->num_peers / 3) | 1795 | if (majority_num < session->num_peers / 3) |
1719 | { | 1796 | { |
1720 | majority_vote = VOTE_REMOVE; | 1797 | /* It is not the case that all nonfaulty peers |
1798 | echoed the same value. Since we're doing a set reconciliation, we | ||
1799 | can't simply send "nothing" for the value. Thus we mark our 'confirm' | ||
1800 | reconciliation as contested. Other peers might not know that the | ||
1801 | leader is faulty, thus we still re-distribute in the confirmation | ||
1802 | round. */ | ||
1721 | output_set->is_contested = GNUNET_YES; | 1803 | output_set->is_contested = GNUNET_YES; |
1722 | } | 1804 | } |
1723 | 1805 | ||
@@ -2509,8 +2591,6 @@ construct_task_graph (struct ConsensusSession *session) | |||
2509 | for (lead = 0; lead < n; lead++) | 2591 | for (lead = 0; lead < n; lead++) |
2510 | construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end); | 2592 | construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end); |
2511 | 2593 | ||
2512 | // TODO: add peers to blacklisted list, either here or | ||
2513 | // already in the gradecast. | ||
2514 | task = ((struct TaskEntry) { | 2594 | task = ((struct TaskEntry) { |
2515 | .step = step_rep_end, | 2595 | .step = step_rep_end, |
2516 | .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1}, | 2596 | .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1}, |