aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2015-10-06 15:22:02 +0000
committerFlorian Dold <florian.dold@gmail.com>2015-10-06 15:22:02 +0000
commitc81d7e1199cf2779973900677ffb3d12e12ca96f (patch)
tree2065078e38d3506f7fbf2d51f3b3125d2f744f36 /src/consensus/gnunet-service-consensus.c
parent4f47b28bace8908afbd9d2b5d4093005931d298d (diff)
downloadgnunet-c81d7e1199cf2779973900677ffb3d12e12ca96f.tar.gz
gnunet-c81d7e1199cf2779973900677ffb3d12e12ca96f.zip
towards handling byz. faults correctly
- blacklisting of peers in consensus - restructuring
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c278
1 files changed, 179 insertions, 99 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index dd619c6e4..2d2725b64 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -34,9 +34,34 @@
34#include "consensus.h" 34#include "consensus.h"
35 35
36 36
37#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
38
39
40enum ReferendumVote
41{
42 /**
43 * Vote that nothing should change.
44 * This option is never voted explicitly.
45 */
46 VOTE_STAY = 0,
47 /**
48 * Vote that an element should be added.
49 */
50 VOTE_ADD = 1,
51 /**
52 * Vote that an element should be removed.
53 */
54 VOTE_REMOVE = 2,
55};
56
37 57
38GNUNET_NETWORK_STRUCT_BEGIN 58GNUNET_NETWORK_STRUCT_BEGIN
39 59
60
61struct ContestedPayload
62{
63};
64
40/** 65/**
41 * Tuple of integers that together 66 * Tuple of integers that together
42 * identify a task uniquely. 67 * identify a task uniquely.
@@ -72,23 +97,6 @@ struct TaskKey {
72}; 97};
73 98
74 99
75enum ReferendumVote
76{
77 /**
78 * Vote that nothing should change.
79 * This option is never voted explicitly.
80 */
81 VOTE_STAY = 0,
82 /**
83 * Vote that an element should be added.
84 */
85 VOTE_ADD = 1,
86 /**
87 * Vote that an element should be removed.
88 */
89 VOTE_REMOVE = 2,
90};
91
92 100
93struct SetKey 101struct SetKey
94{ 102{
@@ -136,7 +144,6 @@ enum PhaseKind
136 PHASE_KIND_GRADECAST_ECHO_GRADE, 144 PHASE_KIND_GRADECAST_ECHO_GRADE,
137 PHASE_KIND_GRADECAST_CONFIRM, 145 PHASE_KIND_GRADECAST_CONFIRM,
138 PHASE_KIND_GRADECAST_CONFIRM_GRADE, 146 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
139 PHASE_KIND_GRADECAST_APPLY_RESULT,
140 /** 147 /**
141 * Apply a repetition of the all-to-all 148 * Apply a repetition of the all-to-all
142 * gradecast to the current set. 149 * gradecast to the current set.
@@ -484,9 +491,6 @@ static void
484finish_task (struct TaskEntry *task); 491finish_task (struct TaskEntry *task);
485 492
486static void 493static void
487task_start_reconcile (struct TaskEntry *task);
488
489static void
490run_ready_steps (struct ConsensusSession *session); 494run_ready_steps (struct ConsensusSession *session);
491 495
492static const char * 496static const char *
@@ -501,7 +505,6 @@ phasename (uint16_t phase)
501 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE"; 505 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
502 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM"; 506 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
503 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE"; 507 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
504 case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
505 case PHASE_KIND_APPLY_REP: return "APPLY_REP"; 508 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
506 default: return "(unknown)"; 509 default: return "(unknown)";
507 } 510 }
@@ -772,24 +775,39 @@ diff_insert (struct DiffEntry *diff,
772 775
773 776
774static void 777static void
778rfn_commit (struct ReferendumEntry *rfn,
779 uint16_t commit_peer)
780{
781 GNUNET_assert (commit_peer < rfn->num_peers);
782
783 rfn->peer_commited[commit_peer] = GNUNET_YES;
784}
785
786
787static void
788rfn_contest (struct ReferendumEntry *rfn,
789 uint16_t contested_peer)
790{
791 GNUNET_assert (contested_peer < rfn->num_peers);
792
793 rfn->peer_contested[contested_peer] = GNUNET_YES;
794}
795
796static void
775rfn_vote (struct ReferendumEntry *rfn, 797rfn_vote (struct ReferendumEntry *rfn,
776 uint16_t voting_peer, 798 uint16_t voting_peer,
777 uint16_t num_peers,
778 enum ReferendumVote vote, 799 enum ReferendumVote vote,
779 const struct GNUNET_SET_Element *element) 800 const struct GNUNET_SET_Element *element)
780{ 801{
781 struct RfnElementInfo *ri; 802 struct RfnElementInfo *ri;
782 struct GNUNET_HashCode hash; 803 struct GNUNET_HashCode hash;
783 804
784 GNUNET_assert (voting_peer < num_peers); 805 GNUNET_assert (voting_peer < rfn->num_peers);
785 806
786 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE, 807 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
787 since VOTE_KEEP is implicit in not voting. */ 808 since VOTE_KEEP is implicit in not voting. */
788 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) ); 809 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
789 810
790 // XXX: should happen in another place!
791 rfn->peer_commited[voting_peer] = GNUNET_YES;
792
793 GNUNET_SET_element_hash (element, &hash); 811 GNUNET_SET_element_hash (element, &hash);
794 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash); 812 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
795 813
@@ -797,7 +815,7 @@ rfn_vote (struct ReferendumEntry *rfn,
797 { 815 {
798 ri = GNUNET_new (struct RfnElementInfo); 816 ri = GNUNET_new (struct RfnElementInfo);
799 ri->element = GNUNET_SET_element_dup (element); 817 ri->element = GNUNET_SET_element_dup (element);
800 ri->votes = GNUNET_new_array (num_peers, int); 818 ri->votes = GNUNET_new_array (rfn->num_peers, int);
801 GNUNET_assert (GNUNET_OK == 819 GNUNET_assert (GNUNET_OK ==
802 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements, 820 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
803 &hash, ri, 821 &hash, ri,
@@ -897,6 +915,16 @@ set_result_cb (void *cls,
897 return; 915 return;
898 } 916 }
899 917
918 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
919 {
920 if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
921 {
922 GNUNET_assert (NULL != output_rfn);
923 rfn_contest (output_rfn, task_other_peer (task));
924 return;
925 }
926 }
927
900 switch (status) 928 switch (status)
901 { 929 {
902 case GNUNET_SET_STATUS_ADD_LOCAL: 930 case GNUNET_SET_STATUS_ADD_LOCAL:
@@ -933,7 +961,7 @@ set_result_cb (void *cls,
933 } 961 }
934 if (NULL != output_rfn) 962 if (NULL != output_rfn)
935 { 963 {
936 rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element); 964 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
937#ifdef GNUNET_EXTRA_LOGGING 965#ifdef GNUNET_EXTRA_LOGGING
938 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 966 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
939 "P%u: adding element %s into rfn {%s} of task {%s}\n", 967 "P%u: adding element %s into rfn {%s} of task {%s}\n",
@@ -948,6 +976,8 @@ set_result_cb (void *cls,
948 case GNUNET_SET_STATUS_ADD_REMOTE: 976 case GNUNET_SET_STATUS_ADD_REMOTE:
949 if (GNUNET_YES == setop->do_not_remove) 977 if (GNUNET_YES == setop->do_not_remove)
950 break; 978 break;
979 if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
980 break;
951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 981 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952 "Removing element in Task {%s}\n", 982 "Removing element in Task {%s}\n",
953 debug_str_task_key (&task->key)); 983 debug_str_task_key (&task->key));
@@ -981,7 +1011,7 @@ set_result_cb (void *cls,
981 } 1011 }
982 if (NULL != output_rfn) 1012 if (NULL != output_rfn)
983 { 1013 {
984 rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_REMOVE, element); 1014 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
985#ifdef GNUNET_EXTRA_LOGGING 1015#ifdef GNUNET_EXTRA_LOGGING
986 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1016 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
987 "P%u: removing element %s from rfn {%s} of task {%s}\n", 1017 "P%u: removing element %s from rfn {%s} of task {%s}\n",
@@ -995,10 +1025,13 @@ set_result_cb (void *cls,
995 case GNUNET_SET_STATUS_DONE: 1025 case GNUNET_SET_STATUS_DONE:
996 // XXX: check first if any changes to the underlying 1026 // XXX: check first if any changes to the underlying
997 // set are still pending 1027 // set are still pending
998 // XXX: commit other peer in referendum
999 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1028 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000 "Finishing setop in Task {%s}\n", 1029 "Finishing setop in Task {%s}\n",
1001 debug_str_task_key (&task->key)); 1030 debug_str_task_key (&task->key));
1031 if (NULL != output_rfn)
1032 {
1033 rfn_commit (output_rfn, task_other_peer (task));
1034 }
1002 finish_task (task); 1035 finish_task (task);
1003 break; 1036 break;
1004 case GNUNET_SET_STATUS_FAILURE: 1037 case GNUNET_SET_STATUS_FAILURE:
@@ -1159,6 +1192,15 @@ commit_set (struct ConsensusSession *session,
1159 } 1192 }
1160 } 1193 }
1161#else 1194#else
1195 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1196 {
1197 struct GNUNET_SET_Element element;
1198 struct ContestedPayload payload;
1199 element.data = &payload;
1200 element.size = sizeof (struct ContestedPayload);
1201 element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1202 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1203 }
1162 GNUNET_SET_commit (setop->op, set->h); 1204 GNUNET_SET_commit (setop->op, set->h);
1163#endif 1205#endif
1164} 1206}
@@ -1212,25 +1254,6 @@ put_rfn (struct ConsensusSession *session,
1212 1254
1213 1255
1214static void 1256static void
1215output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
1216{
1217 struct TaskEntry *task = (struct TaskEntry *) cls;
1218 struct SetOpCls *setop = &task->cls.setop;
1219 struct ConsensusSession *session = task->step->session;
1220 struct SetEntry *set = GNUNET_new (struct SetEntry);
1221
1222 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1223 "P%u: Received lazy copy, storing output set %s\n",
1224 session->local_peer_idx, debug_str_set_key (&setop->output_set));
1225
1226 set->key = setop->output_set;
1227 set->h = copy;
1228 put_set (task->step->session, set);
1229 task_start_reconcile (task);
1230}
1231
1232
1233static void
1234task_cancel_reconcile (struct TaskEntry *task) 1257task_cancel_reconcile (struct TaskEntry *task)
1235{ 1258{
1236 /* not implemented yet */ 1259 /* not implemented yet */
@@ -1249,15 +1272,18 @@ apply_diff_to_rfn (struct DiffEntry *diff,
1249 1272
1250 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes); 1273 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1251 1274
1252 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di)) 1275 while (GNUNET_YES ==
1276 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1277 NULL,
1278 (const void **) &di))
1253 { 1279 {
1254 if (di->weight > 0) 1280 if (di->weight > 0)
1255 { 1281 {
1256 rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element); 1282 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1257 } 1283 }
1258 if (di->weight < 0) 1284 if (di->weight < 0)
1259 { 1285 {
1260 rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element); 1286 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1261 } 1287 }
1262 } 1288 }
1263 1289
@@ -1401,6 +1427,12 @@ create_set_copy_for_task (struct TaskEntry *task,
1401 struct SetEntry *src_set; 1427 struct SetEntry *src_set;
1402 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls); 1428 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1403 1429
1430 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1431 "Copying set {%s} to {%s} for task {%s}\n",
1432 debug_str_set_key (src_set_key),
1433 debug_str_set_key (dst_set_key),
1434 debug_str_task_key (&task->key));
1435
1404 scc->task = task; 1436 scc->task = task;
1405 scc->dst_set_key = *dst_set_key; 1437 scc->dst_set_key = *dst_set_key;
1406 src_set = lookup_set (task->step->session, src_set_key); 1438 src_set = lookup_set (task->step->session, src_set_key);
@@ -1410,6 +1442,34 @@ create_set_copy_for_task (struct TaskEntry *task,
1410 scc); 1442 scc);
1411} 1443}
1412 1444
1445
1446struct SetMutationProgressCls
1447{
1448 int num_pending;
1449 /**
1450 * Task to finish once all changes are through.
1451 */
1452 struct TaskEntry *task;
1453};
1454
1455
1456static void
1457set_mutation_done (void *cls)
1458{
1459 struct SetMutationProgressCls *pc = cls;
1460
1461 GNUNET_assert (pc->num_pending > 0);
1462
1463 pc->num_pending--;
1464
1465 if (0 == pc->num_pending)
1466 {
1467 struct TaskEntry *task = pc->task;
1468 GNUNET_free (pc);
1469 finish_task (task);
1470 }
1471}
1472
1413static void 1473static void
1414task_start_apply_round (struct TaskEntry *task) 1474task_start_apply_round (struct TaskEntry *task)
1415{ 1475{
@@ -1421,6 +1481,7 @@ task_start_apply_round (struct TaskEntry *task)
1421 struct ReferendumEntry *rfn_in; 1481 struct ReferendumEntry *rfn_in;
1422 struct GNUNET_CONTAINER_MultiHashMapIterator *iter; 1482 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1423 struct RfnElementInfo *ri; 1483 struct RfnElementInfo *ri;
1484 struct SetMutationProgressCls *progress_cls;
1424 1485
1425 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition }; 1486 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1426 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; 1487 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
@@ -1436,6 +1497,9 @@ task_start_apply_round (struct TaskEntry *task)
1436 rfn_in = lookup_rfn (session, &rk_in); 1497 rfn_in = lookup_rfn (session, &rk_in);
1437 GNUNET_assert (NULL != rfn_in); 1498 GNUNET_assert (NULL != rfn_in);
1438 1499
1500 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1501 progress_cls->task = task;
1502
1439 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements); 1503 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1440 1504
1441 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) 1505 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
@@ -1448,10 +1512,20 @@ task_start_apply_round (struct TaskEntry *task)
1448 switch (majority_vote) 1512 switch (majority_vote)
1449 { 1513 {
1450 case VOTE_ADD: 1514 case VOTE_ADD:
1451 // XXX: add to set 1515 progress_cls->num_pending++;
1516 GNUNET_assert (GNUNET_OK ==
1517 GNUNET_SET_add_element (set_out->h,
1518 ri->element,
1519 set_mutation_done,
1520 progress_cls));
1452 break; 1521 break;
1453 case VOTE_REMOVE: 1522 case VOTE_REMOVE:
1454 // XXX: remove from set 1523 progress_cls->num_pending++;
1524 GNUNET_assert (GNUNET_OK ==
1525 GNUNET_SET_remove_element (set_out->h,
1526 ri->element,
1527 set_mutation_done,
1528 progress_cls));
1455 break; 1529 break;
1456 case VOTE_STAY: 1530 case VOTE_STAY:
1457 // do nothing 1531 // do nothing
@@ -1461,9 +1535,19 @@ task_start_apply_round (struct TaskEntry *task)
1461 break; 1535 break;
1462 } 1536 }
1463 } 1537 }
1538
1539 if (progress_cls->num_pending == 0)
1540 {
1541 // call closure right now, no pending ops
1542 GNUNET_free (progress_cls);
1543 finish_task (task);
1544 }
1464} 1545}
1465 1546
1466 1547
1548#define THRESH(s) (((s)->num_peers / 3))
1549
1550
1467static void 1551static void
1468task_start_grade (struct TaskEntry *task) 1552task_start_grade (struct TaskEntry *task)
1469{ 1553{
@@ -1475,12 +1559,14 @@ task_start_grade (struct TaskEntry *task)
1475 struct DiffKey diff_key; 1559 struct DiffKey diff_key;
1476 struct GNUNET_CONTAINER_MultiHashMapIterator *iter; 1560 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1477 struct RfnElementInfo *ri; 1561 struct RfnElementInfo *ri;
1562 unsigned int gradecast_confidence = 2;
1478 1563
1479 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition }; 1564 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1480 output_rfn = lookup_rfn (session, &rfn_key); 1565 output_rfn = lookup_rfn (session, &rfn_key);
1481 if (NULL == output_rfn) 1566 if (NULL == output_rfn)
1482 { 1567 {
1483 output_rfn = rfn_create (session->num_peers); 1568 output_rfn = rfn_create (session->num_peers);
1569 output_rfn->key = rfn_key;
1484 put_rfn (session, output_rfn); 1570 put_rfn (session, output_rfn);
1485 } 1571 }
1486 1572
@@ -1492,26 +1578,50 @@ task_start_grade (struct TaskEntry *task)
1492 input_rfn = lookup_rfn (session, &rfn_key); 1578 input_rfn = lookup_rfn (session, &rfn_key);
1493 GNUNET_assert (NULL != input_rfn); 1579 GNUNET_assert (NULL != input_rfn);
1494 1580
1495 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1496
1497 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); 1581 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1498 1582
1583 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1584
1499 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) 1585 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1500 { 1586 {
1501 uint16_t majority_num; 1587 uint16_t majority_num;
1502 enum ReferendumVote majority_vote; 1588 enum ReferendumVote majority_vote;
1503 1589
1590 // XXX: we need contested votes and non-contested votes here
1504 rfn_majority (input_rfn, ri, &majority_num, &majority_vote); 1591 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1505 1592
1506 1593 if (majority_num < (session->num_peers / 3) * 2)
1594 {
1595 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1596 }
1597 if (majority_num < (session->num_peers / 3) + 1)
1598 {
1599 gradecast_confidence = 0;
1600 }
1507 1601
1508 switch (majority_vote) 1602 switch (majority_vote)
1509 { 1603 {
1604 case VOTE_STAY:
1605 break;
1606 case VOTE_ADD:
1607 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1608 break;
1609 case VOTE_REMOVE:
1610 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1611 break;
1510 default: 1612 default:
1511 GNUNET_assert (0); 1613 GNUNET_assert (0);
1512 break; 1614 break;
1513 } 1615 }
1514 } 1616 }
1617
1618 if (gradecast_confidence >= 1)
1619 rfn_commit (output_rfn, task->key.leader);
1620
1621 if (gradecast_confidence <= 1)
1622 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1623
1624 finish_task (task);
1515} 1625}
1516 1626
1517 1627
@@ -1537,12 +1647,7 @@ task_start_reconcile (struct TaskEntry *task)
1537 we clone the input set. */ 1647 we clone the input set. */
1538 if (NULL == lookup_set (session, &setop->output_set)) 1648 if (NULL == lookup_set (session, &setop->output_set))
1539 { 1649 {
1540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1650 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1541 "Output set missing, copying from input set\n");
1542 /* Since the cloning is asynchronous,
1543 we'll retry the current function once the copy
1544 has been provided by the SET service. */
1545 GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task);
1546 return; 1651 return;
1547 } 1652 }
1548 } 1653 }
@@ -1640,37 +1745,6 @@ task_start_reconcile (struct TaskEntry *task)
1640} 1745}
1641 1746
1642 1747
1643
1644
1645struct SetMutationProgressCls
1646{
1647 int num_pending;
1648 /**
1649 * Task to finish once all changes are through.
1650 */
1651 struct TaskEntry *task;
1652};
1653
1654
1655static void
1656set_mutation_done (void *cls)
1657{
1658 struct SetMutationProgressCls *pc = cls;
1659
1660 GNUNET_assert (pc->num_pending > 0);
1661
1662 pc->num_pending--;
1663
1664 if (0 == pc->num_pending)
1665 {
1666 struct TaskEntry *task = pc->task;
1667 GNUNET_free (pc);
1668 finish_task (task);
1669 }
1670}
1671
1672
1673
1674static void 1748static void
1675task_start_eval_echo (struct TaskEntry *task) 1749task_start_eval_echo (struct TaskEntry *task)
1676{ 1750{
@@ -1708,7 +1782,10 @@ task_start_eval_echo (struct TaskEntry *task)
1708 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements); 1782 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1709 GNUNET_assert (NULL != iter); 1783 GNUNET_assert (NULL != iter);
1710 1784
1711 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) 1785 while (GNUNET_YES ==
1786 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1787 NULL,
1788 (const void **) &ri))
1712 { 1789 {
1713 enum ReferendumVote majority_vote; 1790 enum ReferendumVote majority_vote;
1714 uint16_t majority_num; 1791 uint16_t majority_num;
@@ -1717,7 +1794,12 @@ task_start_eval_echo (struct TaskEntry *task)
1717 1794
1718 if (majority_num < session->num_peers / 3) 1795 if (majority_num < session->num_peers / 3)
1719 { 1796 {
1720 majority_vote = VOTE_REMOVE; 1797 /* It is not the case that all nonfaulty peers
1798 echoed the same value. Since we're doing a set reconciliation, we
1799 can't simply send "nothing" for the value. Thus we mark our 'confirm'
1800 reconciliation as contested. Other peers might not know that the
1801 leader is faulty, thus we still re-distribute in the confirmation
1802 round. */
1721 output_set->is_contested = GNUNET_YES; 1803 output_set->is_contested = GNUNET_YES;
1722 } 1804 }
1723 1805
@@ -2509,8 +2591,6 @@ construct_task_graph (struct ConsensusSession *session)
2509 for (lead = 0; lead < n; lead++) 2591 for (lead = 0; lead < n; lead++)
2510 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end); 2592 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2511 2593
2512 // TODO: add peers to blacklisted list, either here or
2513 // already in the gradecast.
2514 task = ((struct TaskEntry) { 2594 task = ((struct TaskEntry) {
2515 .step = step_rep_end, 2595 .step = step_rep_end,
2516 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1}, 2596 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},