aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2015-10-06 02:08:16 +0000
committerFlorian Dold <florian.dold@gmail.com>2015-10-06 02:08:16 +0000
commitc3c8597a4ce377b7825e4a526b7de1daeba2105d (patch)
tree3345cbbb9fe17fbb3e0d205fe05a3fafe44b2d3c /src/consensus/gnunet-service-consensus.c
parent1526a366e98d0f7ac4f82c548273c9f2c06f553f (diff)
downloadgnunet-c3c8597a4ce377b7825e4a526b7de1daeba2105d.tar.gz
gnunet-c3c8597a4ce377b7825e4a526b7de1daeba2105d.zip
consensus work in progress
- store referendum entries differently - split referendum evaluation in specialized task functions
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c739
1 files changed, 269 insertions, 470 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 60bc294ed..dd619c6e4 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -74,10 +74,19 @@ struct TaskKey {
74 74
75enum ReferendumVote 75enum ReferendumVote
76{ 76{
77 VOTE_NONE = 0, 77 /**
78 * Vote that nothing should change.
79 * This option is never voted explicitly.
80 */
81 VOTE_STAY = 0,
82 /**
83 * Vote that an element should be added.
84 */
78 VOTE_ADD = 1, 85 VOTE_ADD = 1,
86 /**
87 * Vote that an element should be removed.
88 */
79 VOTE_REMOVE = 2, 89 VOTE_REMOVE = 2,
80 VOTE_CONTESTED = 3
81}; 90};
82 91
83 92
@@ -137,28 +146,6 @@ enum PhaseKind
137}; 146};
138 147
139 148
140enum TaskKind
141{
142 /**
143 * Do a set reconciliation with another peer (or via looback).
144 */
145 TASK_RECONCILE,
146 /**
147 * Same as reconciliation, but only care about added elements.
148 */
149 TASK_UNION,
150 /**
151 * Apply a referendum with a threshold
152 * to a set and/or a diff.
153 */
154 TASK_EVAL_RFN,
155 /**
156 * Apply a diff to a set.
157 */
158 TASK_APPLY_DIFF,
159 FASK_FINISH,
160};
161
162enum SetKind 149enum SetKind
163{ 150{
164 SET_KIND_NONE = 0, 151 SET_KIND_NONE = 0,
@@ -194,35 +181,9 @@ struct SetOpCls
194 181
195 int do_not_remove; 182 int do_not_remove;
196 183
197 struct GNUNET_SET_OperationHandle *op; 184 int transceive_contested;
198};
199 185
200struct EvalRfnCls 186 struct GNUNET_SET_OperationHandle *op;
201{
202 struct SetKey input_set;
203 struct RfnKey input_rfn;
204
205 uint16_t threshold;
206
207 struct SetKey output_set;
208 struct DiffKey output_diff;
209};
210
211
212struct ApplyDiffCls
213{
214 struct SetKey input_set;
215 struct DiffKey input_diff;
216 struct SetKey output_set;
217};
218
219
220struct LeaderApplyCls
221{
222 struct DiffKey input_diff_1;
223 struct DiffKey input_diff_2;
224
225 struct RfnKey output_rfn;
226}; 187};
227 188
228 189
@@ -238,9 +199,6 @@ struct FinishCls
238union TaskFuncCls 199union TaskFuncCls
239{ 200{
240 struct SetOpCls setop; 201 struct SetOpCls setop;
241 struct EvalRfnCls eval_rfn;
242 struct ApplyDiffCls apply_diff;
243 struct LeaderApplyCls leader_apply;
244 struct FinishCls finish; 202 struct FinishCls finish;
245}; 203};
246 204
@@ -261,8 +219,6 @@ struct TaskEntry
261 219
262 int is_finished; 220 int is_finished;
263 221
264 enum TaskKind kind;
265
266 TaskFunc start; 222 TaskFunc start;
267 TaskFunc cancel; 223 TaskFunc cancel;
268 224
@@ -338,10 +294,15 @@ struct RfnElementInfo
338 const struct GNUNET_SET_Element *element; 294 const struct GNUNET_SET_Element *element;
339 295
340 /* 296 /*
341 * Vote (or VOTE_NONE) from every peer 297 * GNUNET_YES if the peer votes for the proposal.
342 * in the session about the element.
343 */ 298 */
344 int *votes; 299 int *votes;
300
301 /**
302 * Proposal for this element,
303 * can only be VOTE_ADD or VOTE_REMOVE.
304 */
305 enum ReferendumVote proposal;
345}; 306};
346 307
347 308
@@ -357,6 +318,8 @@ struct ReferendumEntry
357 */ 318 */
358 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements; 319 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
359 320
321 unsigned int num_peers;
322
360 /** 323 /**
361 * Stores, for every peer in the session, 324 * Stores, for every peer in the session,
362 * whether the peer finished the whole referendum. 325 * whether the peer finished the whole referendum.
@@ -426,7 +389,7 @@ struct ConsensusSession
426 /** 389 /**
427 * Array of peers with length 'num_peers'. 390 * Array of peers with length 'num_peers'.
428 */ 391 */
429 int *peers_ignored; 392 int *peers_blacklisted;
430 393
431 /* 394 /*
432 * Mapping from (hashed) TaskKey to TaskEntry. 395 * Mapping from (hashed) TaskKey to TaskEntry.
@@ -524,12 +487,6 @@ static void
524task_start_reconcile (struct TaskEntry *task); 487task_start_reconcile (struct TaskEntry *task);
525 488
526static void 489static void
527task_start_eval_rfn (struct TaskEntry *task);
528
529static void
530task_start_apply_diff (struct TaskEntry *task);
531
532static void
533run_ready_steps (struct ConsensusSession *session); 490run_ready_steps (struct ConsensusSession *session);
534 491
535static const char * 492static const char *
@@ -627,7 +584,7 @@ debug_str_diff_key (struct DiffKey *dk)
627} 584}
628 585
629static const char * 586static const char *
630debug_str_set_key (struct SetKey *sk) 587debug_str_set_key (const struct SetKey *sk)
631{ 588{
632 static char buf[256]; 589 static char buf[256];
633 590
@@ -640,7 +597,7 @@ debug_str_set_key (struct SetKey *sk)
640 597
641 598
642static const char * 599static const char *
643debug_str_rfn_key (struct RfnKey *rk) 600debug_str_rfn_key (const struct RfnKey *rk)
644{ 601{
645 static char buf[256]; 602 static char buf[256];
646 603
@@ -818,7 +775,7 @@ static void
818rfn_vote (struct ReferendumEntry *rfn, 775rfn_vote (struct ReferendumEntry *rfn,
819 uint16_t voting_peer, 776 uint16_t voting_peer,
820 uint16_t num_peers, 777 uint16_t num_peers,
821 int vote, 778 enum ReferendumVote vote,
822 const struct GNUNET_SET_Element *element) 779 const struct GNUNET_SET_Element *element)
823{ 780{
824 struct RfnElementInfo *ri; 781 struct RfnElementInfo *ri;
@@ -826,12 +783,16 @@ rfn_vote (struct ReferendumEntry *rfn,
826 783
827 GNUNET_assert (voting_peer < num_peers); 784 GNUNET_assert (voting_peer < num_peers);
828 785
786 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
787 since VOTE_KEEP is implicit in not voting. */
788 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
789
790 // XXX: should happen in another place!
829 rfn->peer_commited[voting_peer] = GNUNET_YES; 791 rfn->peer_commited[voting_peer] = GNUNET_YES;
830 792
831 GNUNET_SET_element_hash (element, &hash); 793 GNUNET_SET_element_hash (element, &hash);
832 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash); 794 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
833 795
834
835 if (NULL == ri) 796 if (NULL == ri)
836 { 797 {
837 ri = GNUNET_new (struct RfnElementInfo); 798 ri = GNUNET_new (struct RfnElementInfo);
@@ -843,7 +804,8 @@ rfn_vote (struct ReferendumEntry *rfn,
843 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); 804 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
844 } 805 }
845 806
846 ri->votes[voting_peer] = vote; 807 ri->votes[voting_peer] = GNUNET_YES;
808 ri->proposal = vote;
847} 809}
848 810
849 811
@@ -927,10 +889,10 @@ set_result_cb (void *cls,
927 GNUNET_assert (NULL != output_rfn); 889 GNUNET_assert (NULL != output_rfn);
928 } 890 }
929 891
930 if (GNUNET_YES == session->peers_ignored[other_idx]) 892 if (GNUNET_YES == session->peers_blacklisted[other_idx])
931 { 893 {
932 /* We should have never started or commited to an operation 894 /* We should have never started or commited to an operation
933 with an ignored peer. */ 895 with a blacklisted peer. */
934 GNUNET_break (0); 896 GNUNET_break (0);
935 return; 897 return;
936 } 898 }
@@ -1350,12 +1312,13 @@ rfn_create (uint16_t size)
1350 rfn = GNUNET_new (struct ReferendumEntry); 1312 rfn = GNUNET_new (struct ReferendumEntry);
1351 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO); 1313 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1352 rfn->peer_commited = GNUNET_new_array (size, int); 1314 rfn->peer_commited = GNUNET_new_array (size, int);
1315 rfn->num_peers = size;
1353 1316
1354 return rfn; 1317 return rfn;
1355} 1318}
1356 1319
1357 1320
1358static void 1321void
1359diff_destroy (struct DiffEntry *diff) 1322diff_destroy (struct DiffEntry *diff)
1360{ 1323{
1361 GNUNET_CONTAINER_multihashmap_destroy (diff->changes); 1324 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
@@ -1364,49 +1327,191 @@ diff_destroy (struct DiffEntry *diff)
1364 1327
1365 1328
1366static void 1329static void
1367task_start_leader_apply (struct TaskEntry *task) 1330rfn_majority (const struct ReferendumEntry *rfn,
1331 const struct RfnElementInfo *ri,
1332 uint16_t *ret_majority,
1333 enum ReferendumVote *ret_vote)
1334{
1335 uint16_t votes_yes = 0;
1336 uint16_t num_commited = 0;
1337 uint16_t i;
1338
1339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340 "Computing rfn majority for element %s of rfn {%s}\n",
1341 debug_str_element (ri->element),
1342 debug_str_rfn_key (&rfn->key));
1343
1344 for (i = 0; i < rfn->num_peers; i++)
1345 {
1346 if (GNUNET_NO == rfn->peer_commited[i])
1347 continue;
1348 num_commited++;
1349
1350 if (GNUNET_YES == ri->votes[i])
1351 votes_yes++;
1352 }
1353
1354 if (votes_yes > (num_commited) / 2)
1355 {
1356 *ret_vote = ri->proposal;
1357 *ret_majority = votes_yes;
1358 }
1359 else
1360 {
1361 *ret_vote = VOTE_STAY;
1362 *ret_majority = num_commited - votes_yes;
1363 }
1364}
1365
1366
1367struct SetCopyCls
1368{
1369 struct TaskEntry *task;
1370 struct SetKey dst_set_key;
1371};
1372
1373
1374static void
1375set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1376{
1377 struct SetCopyCls *scc = cls;
1378 struct TaskEntry *task = scc->task;
1379 struct SetKey dst_set_key = scc->dst_set_key;
1380 struct SetEntry *set;
1381
1382 GNUNET_free (scc);
1383 set = GNUNET_new (struct SetEntry);
1384 set->h = copy;
1385 set->key = dst_set_key;
1386 put_set (task->step->session, set);
1387
1388 task->start (task);
1389}
1390
1391
1392/**
1393 * Call the start function of the given
1394 * task again after we created a copy of the given set.
1395 */
1396static void
1397create_set_copy_for_task (struct TaskEntry *task,
1398 struct SetKey *src_set_key,
1399 struct SetKey *dst_set_key)
1400{
1401 struct SetEntry *src_set;
1402 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1403
1404 scc->task = task;
1405 scc->dst_set_key = *dst_set_key;
1406 src_set = lookup_set (task->step->session, src_set_key);
1407 GNUNET_assert (NULL != src_set);
1408 GNUNET_SET_copy_lazy (src_set->h,
1409 set_copy_cb,
1410 scc);
1411}
1412
1413static void
1414task_start_apply_round (struct TaskEntry *task)
1368{ 1415{
1369 struct LeaderApplyCls *lacls = &task->cls.leader_apply;
1370 struct ConsensusSession *session = task->step->session; 1416 struct ConsensusSession *session = task->step->session;
1371 struct DiffEntry *diff_1; 1417 struct SetKey sk_in;
1372 struct DiffEntry *diff_2; 1418 struct SetKey sk_out;
1373 struct DiffEntry *diff_combined; 1419 struct RfnKey rk_in;
1374 struct ReferendumEntry *rfn; 1420 struct SetEntry *set_out;
1421 struct ReferendumEntry *rfn_in;
1422 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1423 struct RfnElementInfo *ri;
1375 1424
1376 diff_1 = lookup_diff (session, &lacls->input_diff_1); 1425 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1377 GNUNET_assert (NULL != diff_1); 1426 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1427 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1378 1428
1379 diff_2 = lookup_diff (session, &lacls->input_diff_2); 1429 set_out = lookup_set (session, &sk_out);
1380 GNUNET_assert (NULL != diff_2); 1430 if (NULL == set_out)
1431 {
1432 create_set_copy_for_task (task, &sk_in, &sk_out);
1433 return;
1434 }
1381 1435
1382 rfn = lookup_rfn (session, &lacls->output_rfn); 1436 rfn_in = lookup_rfn (session, &rk_in);
1437 GNUNET_assert (NULL != rfn_in);
1383 1438
1384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1439 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1385 "looked up everything\n"); 1440
1386 1441 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1387 if (NULL == rfn)
1388 { 1442 {
1389 rfn = rfn_create (session->num_peers); 1443 uint16_t majority_num;
1390 rfn->key = lacls->output_rfn; 1444 enum ReferendumVote majority_vote;
1391 put_rfn (session, rfn); 1445
1446 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1447
1448 switch (majority_vote)
1449 {
1450 case VOTE_ADD:
1451 // XXX: add to set
1452 break;
1453 case VOTE_REMOVE:
1454 // XXX: remove from set
1455 break;
1456 case VOTE_STAY:
1457 // do nothing
1458 break;
1459 default:
1460 GNUNET_assert (0);
1461 break;
1462 }
1392 } 1463 }
1464}
1393 1465
1394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395 "ensured rfn\n");
1396 1466
1397 diff_combined = diff_compose (diff_1, diff_2); 1467static void
1468task_start_grade (struct TaskEntry *task)
1469{
1470 struct ConsensusSession *session = task->step->session;
1471 struct ReferendumEntry *output_rfn;
1472 struct ReferendumEntry *input_rfn;
1473 struct DiffEntry *input_diff;
1474 struct RfnKey rfn_key;
1475 struct DiffKey diff_key;
1476 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1477 struct RfnElementInfo *ri;
1398 1478
1399 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1479 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1400 "composed diffs\n"); 1480 output_rfn = lookup_rfn (session, &rfn_key);
1481 if (NULL == output_rfn)
1482 {
1483 output_rfn = rfn_create (session->num_peers);
1484 put_rfn (session, output_rfn);
1485 }
1401 1486
1402 apply_diff_to_rfn (diff_combined, rfn, task->key.leader, session->num_peers); 1487 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1488 input_diff = lookup_diff (session, &diff_key);
1489 GNUNET_assert (NULL != input_diff);
1403 1490
1404 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1491 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1405 "applied diffs to rfns\n"); 1492 input_rfn = lookup_rfn (session, &rfn_key);
1493 GNUNET_assert (NULL != input_rfn);
1494
1495 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1496
1497 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1498
1499 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1500 {
1501 uint16_t majority_num;
1502 enum ReferendumVote majority_vote;
1503
1504 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1406 1505
1407 diff_destroy (diff_combined); 1506
1408 1507
1409 finish_task (task); 1508 switch (majority_vote)
1509 {
1510 default:
1511 GNUNET_assert (0);
1512 break;
1513 }
1514 }
1410} 1515}
1411 1516
1412 1517
@@ -1535,184 +1640,68 @@ task_start_reconcile (struct TaskEntry *task)
1535} 1640}
1536 1641
1537 1642
1538static int
1539rfn_majority (uint16_t num_peers,
1540 struct ReferendumEntry *rfn,
1541 struct RfnElementInfo *ri,
1542 uint16_t threshold)
1543{
1544 unsigned int votes_add = 0;
1545 unsigned int votes_remove = 0;
1546 unsigned int num_commited = 0;
1547 unsigned int maj_thresh;
1548 unsigned int nv;
1549 unsigned int tv;
1550 unsigned int i;
1551 1643
1552 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553 "Computing rfn majority for element %s of rfn {%s}\n",
1554 debug_str_element (ri->element),
1555 debug_str_rfn_key (&rfn->key));
1556 1644
1557 for (i = 0; i < num_peers; i++) 1645struct SetMutationProgressCls
1558 {
1559 if (GNUNET_NO == rfn->peer_commited[i])
1560 continue;
1561 num_commited++;
1562 if (ri->votes[i] == VOTE_ADD)
1563 votes_add++;
1564 if (ri->votes[i] == VOTE_REMOVE)
1565 votes_remove++;
1566 }
1567
1568 /* Threshold to reach a majority among
1569 submitted votes, may not be enough for the
1570 global threshold. */
1571 maj_thresh = (num_commited + 1) / 2;
1572 /* Vote are relative to our local set, so it can only be
1573 either all add or all remove */
1574 GNUNET_assert ( (0 == votes_add) || (0 == votes_remove) );
1575
1576 if (votes_add > 0)
1577 {
1578 nv = votes_add;
1579 tv = VOTE_ADD;
1580 }
1581 else if (votes_remove > 0)
1582 {
1583 nv = votes_remove;
1584 tv = VOTE_REMOVE;
1585 }
1586 else
1587 {
1588 nv = 0;
1589 tv = VOTE_NONE;
1590 }
1591
1592 if ( (nv >= maj_thresh) && (nv >= threshold) )
1593 return tv;
1594
1595 if ( ((num_commited - nv) >= maj_thresh) && ((num_commited - nv) >= threshold) )
1596 return VOTE_NONE;
1597
1598 return VOTE_CONTESTED;
1599}
1600
1601
1602struct SetChangeProgressCls
1603{ 1646{
1604 int num_pending; 1647 int num_pending;
1648 /**
1649 * Task to finish once all changes are through.
1650 */
1605 struct TaskEntry *task; 1651 struct TaskEntry *task;
1606}; 1652};
1607 1653
1608 1654
1609static void 1655static void
1610eval_rfn_done (struct TaskEntry *task) 1656set_mutation_done (void *cls)
1611{ 1657{
1612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1658 struct SetMutationProgressCls *pc = cls;
1613 "P%u: EVAL_REFERENDUM done for task {%s}\n",
1614 task->step->session->local_peer_idx, debug_str_task_key (&task->key));
1615 1659
1616 finish_task (task); 1660 GNUNET_assert (pc->num_pending > 0);
1617}
1618 1661
1662 pc->num_pending--;
1619 1663
1620static void 1664 if (0 == pc->num_pending)
1621eval_rfn_progress (void *cls)
1622{
1623 struct SetChangeProgressCls *erc = cls;
1624
1625 GNUNET_assert (erc->num_pending > 0);
1626
1627 erc->num_pending--;
1628
1629 if (0 == erc->num_pending)
1630 { 1665 {
1631 struct TaskEntry *task = erc->task; 1666 struct TaskEntry *task = pc->task;
1632 GNUNET_free (erc); 1667 GNUNET_free (pc);
1633 eval_rfn_done (task); 1668 finish_task (task);
1634 } 1669 }
1635} 1670}
1636 1671
1637 1672
1638static void
1639eval_rfn_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1640{
1641 struct TaskEntry *task = (struct TaskEntry *) cls;
1642 struct ConsensusSession *session = task->step->session;
1643 struct SetEntry *set;
1644
1645 set = GNUNET_new (struct SetEntry);
1646 set->h = copy;
1647 set->key = task->cls.eval_rfn.output_set;
1648
1649 put_set (session, set);
1650
1651 task_start_eval_rfn (task);
1652}
1653
1654 1673
1655/**
1656 * Take an input set and an input referendum,
1657 * apply the referendum with a threshold to the input
1658 * set and store the result in the output set and/or output diff.
1659 */
1660static void 1674static void
1661task_start_eval_rfn (struct TaskEntry *task) 1675task_start_eval_echo (struct TaskEntry *task)
1662{ 1676{
1663 struct GNUNET_CONTAINER_MultiHashMapIterator *iter; 1677 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1664 struct ReferendumEntry *input_rfn; 1678 struct ReferendumEntry *input_rfn;
1665 struct RfnElementInfo *ri; 1679 struct RfnElementInfo *ri;
1666 struct SetEntry *output_set = NULL; 1680 struct SetEntry *output_set;
1667 struct DiffEntry *output_diff = NULL; 1681 struct SetMutationProgressCls *progress_cls;
1668 struct SetChangeProgressCls *progress_cls;
1669 struct EvalRfnCls *rcls = &task->cls.eval_rfn;
1670 struct ConsensusSession *session = task->step->session; 1682 struct ConsensusSession *session = task->step->session;
1683 struct SetKey sk_in;
1684 struct SetKey sk_out;
1685 struct RfnKey rk_in;
1671 1686
1672 /* Have at least one output */ 1687 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1673 GNUNET_assert ( (rcls->output_set.set_kind != SET_KIND_NONE) || 1688 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
1674 (rcls->output_diff.diff_kind != DIFF_KIND_NONE)); 1689 output_set = lookup_set (session, &sk_out);
1675 1690 if (NULL == output_set)
1676 if (SET_KIND_NONE != rcls->output_set.set_kind)
1677 {
1678 /* We have a set output, thus the output set must
1679 exist or copy it from the input set */
1680 output_set = lookup_set (session, &rcls->output_set);
1681 if (NULL == output_set)
1682 {
1683 struct SetEntry *input_set;
1684
1685 input_set = lookup_set (session, &rcls->input_set);
1686 GNUNET_assert (NULL != input_set);
1687 GNUNET_SET_copy_lazy (input_set->h,
1688 eval_rfn_copy_cb,
1689 task);
1690 /* We'll be called again, this time with the
1691 set ready. */
1692 return;
1693 }
1694 }
1695
1696 if (DIFF_KIND_NONE != rcls->output_diff.diff_kind)
1697 { 1691 {
1698 output_diff = lookup_diff (session, &rcls->output_diff); 1692 create_set_copy_for_task (task, &sk_in, &sk_out);
1699 if (NULL == output_diff) 1693 return;
1700 {
1701 output_diff = diff_create ();
1702 output_diff->key = rcls->output_diff;
1703 put_diff (session, output_diff);
1704 }
1705 } 1694 }
1706 1695
1707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1708 "Evaluating referendum in Task {%s}\n", 1697 "Evaluating referendum in Task {%s}\n",
1709 debug_str_task_key (&task->key)); 1698 debug_str_task_key (&task->key));
1710 1699
1711 1700 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1712 progress_cls = GNUNET_new (struct SetChangeProgressCls);
1713 progress_cls->task = task; 1701 progress_cls->task = task;
1714 1702
1715 input_rfn = lookup_rfn (session, &rcls->input_rfn); 1703 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1704 input_rfn = lookup_rfn (session, &rk_in);
1716 1705
1717 GNUNET_assert (NULL != input_rfn); 1706 GNUNET_assert (NULL != input_rfn);
1718 1707
@@ -1721,77 +1710,36 @@ task_start_eval_rfn (struct TaskEntry *task)
1721 1710
1722 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) 1711 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1723 { 1712 {
1724 int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, rcls->threshold); 1713 enum ReferendumVote majority_vote;
1714 uint16_t majority_num;
1715
1716 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1717
1718 if (majority_num < session->num_peers / 3)
1719 {
1720 majority_vote = VOTE_REMOVE;
1721 output_set->is_contested = GNUNET_YES;
1722 }
1723
1725 switch (majority_vote) 1724 switch (majority_vote)
1726 { 1725 {
1727 case VOTE_ADD: 1726 case VOTE_ADD:
1728#ifdef GNUNET_EXTRA_LOGGING 1727 progress_cls->num_pending++;
1729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1728 GNUNET_assert (GNUNET_OK ==
1730 "P%u: referendum vote result: VOTE_ADD for element %s in task {%s} with" 1729 GNUNET_SET_add_element (output_set->h,
1731 "output set {%s} and output diff {%s}\n", 1730 ri->element,
1732 session->local_peer_idx, 1731 set_mutation_done,
1733 debug_str_element (ri->element), 1732 progress_cls));
1734 debug_str_task_key (&task->key),
1735 debug_str_set_key (&rcls->output_set),
1736 debug_str_diff_key (&rcls->output_diff));
1737#endif
1738 if (NULL != output_set)
1739 {
1740 progress_cls->num_pending++;
1741 GNUNET_assert (GNUNET_OK ==
1742 GNUNET_SET_add_element (output_set->h,
1743 ri->element,
1744 eval_rfn_progress,
1745 progress_cls));
1746 }
1747 if (NULL != output_diff)
1748 {
1749 diff_insert (output_diff, 1, ri->element);
1750 }
1751 break; 1733 break;
1752 case VOTE_CONTESTED:
1753 if (NULL != output_set)
1754 output_set->is_contested = GNUNET_YES;
1755#ifdef GNUNET_EXTRA_LOGGING
1756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1757 "P%u: referendum vote result: VOTE_CONTESTED for element %s in task {%s} with"
1758 "output set {%s} and output diff {%s}\n",
1759 session->local_peer_idx,
1760 debug_str_element (ri->element),
1761 debug_str_task_key (&task->key),
1762 debug_str_set_key (&rcls->output_set),
1763 debug_str_diff_key (&rcls->output_diff));
1764#endif
1765 /* fallthrough */
1766 case VOTE_REMOVE: 1734 case VOTE_REMOVE:
1767#ifdef GNUNET_EXTRA_LOGGING 1735 progress_cls->num_pending++;
1768 if (VOTE_REMOVE == majority_vote) 1736 GNUNET_assert (GNUNET_OK ==
1769 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1737 GNUNET_SET_remove_element (output_set->h,
1770 "P%u: referendum vote result: VOTE_REMOVE for element %s in task {%s} with" 1738 ri->element,
1771 "output set {%s} and output diff {%s}\n", 1739 set_mutation_done,
1772 session->local_peer_idx, 1740 progress_cls));
1773 debug_str_element (ri->element),
1774 debug_str_task_key (&task->key),
1775 debug_str_set_key (&rcls->output_set),
1776 debug_str_diff_key (&rcls->output_diff));
1777#endif
1778 if (NULL != output_set)
1779 {
1780 progress_cls->num_pending++;
1781 GNUNET_assert (GNUNET_OK ==
1782 GNUNET_SET_remove_element (output_set->h,
1783 ri->element,
1784 eval_rfn_progress,
1785 progress_cls));
1786 }
1787 if (NULL != output_diff)
1788 {
1789 diff_insert (output_diff, -1, ri->element);
1790 }
1791 break; 1741 break;
1792 case VOTE_NONE: 1742 case VOTE_STAY:
1793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794 "referendum vote result: VOTE_NONE\n");
1795 /* Nothing to do. */ 1743 /* Nothing to do. */
1796 break; 1744 break;
1797 default: 1745 default:
@@ -1799,122 +1747,6 @@ task_start_eval_rfn (struct TaskEntry *task)
1799 GNUNET_assert (0); 1747 GNUNET_assert (0);
1800 } 1748 }
1801 } 1749 }
1802 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1803
1804 if (progress_cls->num_pending == 0)
1805 {
1806 // call closure right now, no pending ops
1807 GNUNET_free (progress_cls);
1808 eval_rfn_done (task);
1809 }
1810}
1811
1812
1813static void
1814apply_diff_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1815{
1816 struct TaskEntry *task = (struct TaskEntry *) cls;
1817 struct ConsensusSession *session = task->step->session;
1818 struct SetEntry *set;
1819 struct ApplyDiffCls *diffop = &task->cls.apply_diff;
1820
1821 set = GNUNET_new (struct SetEntry);
1822 set->h = copy;
1823 set->key = diffop->output_set;
1824
1825 put_set (session, set);
1826
1827 task_start_apply_diff (task);
1828}
1829
1830
1831static void
1832apply_diff_done (struct TaskEntry *task)
1833{
1834 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1835 "P%u: APPLY_DIFF done for task {%s}\n",
1836 task->step->session->local_peer_idx, debug_str_task_key (&task->key));
1837 finish_task (task);
1838}
1839
1840
1841static void
1842apply_diff_progress (void *cls)
1843{
1844 struct SetChangeProgressCls *erc = cls;
1845
1846 GNUNET_assert (erc->num_pending > 0);
1847
1848 erc->num_pending--;
1849
1850 if (0 == erc->num_pending)
1851 {
1852 struct TaskEntry *task = erc->task;
1853 GNUNET_free (erc);
1854 apply_diff_done (task);
1855 }
1856}
1857
1858
1859static void
1860task_start_apply_diff (struct TaskEntry *task)
1861{
1862 struct SetEntry *output_set;
1863 struct DiffEntry *input_diff;
1864 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1865 struct DiffElementInfo *di;
1866 struct SetChangeProgressCls *progress_cls;
1867 struct ApplyDiffCls *diffop = &task->cls.apply_diff;
1868 struct ConsensusSession *session = task->step->session;
1869
1870 GNUNET_assert (diffop->output_set.set_kind != SET_KIND_NONE);
1871 GNUNET_assert (diffop->input_diff.diff_kind != DIFF_KIND_NONE);
1872
1873 input_diff = lookup_diff (session, &diffop->input_diff);
1874
1875 GNUNET_assert (NULL != input_diff);
1876
1877 output_set = lookup_set (session, &diffop->output_set);
1878
1879 if (NULL == output_set)
1880 {
1881 struct SetEntry *input_set;
1882
1883 input_set = lookup_set (session, &diffop->input_set);
1884 GNUNET_assert (NULL != input_set);
1885 GNUNET_SET_copy_lazy (input_set->h,
1886 apply_diff_copy_cb,
1887 task);
1888 /* We'll be called again, this time with the
1889 set ready. */
1890 return;
1891 }
1892
1893 progress_cls = GNUNET_new (struct SetChangeProgressCls);
1894
1895 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_diff->changes);
1896
1897 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1898 {
1899 if (di->weight > 0)
1900 {
1901 progress_cls->num_pending++;
1902 GNUNET_assert (GNUNET_OK ==
1903 GNUNET_SET_remove_element (output_set->h,
1904 di->element,
1905 apply_diff_progress,
1906 progress_cls));
1907 }
1908 else if (di->weight < 0)
1909 {
1910 progress_cls->num_pending++;
1911 GNUNET_assert (GNUNET_OK ==
1912 GNUNET_SET_add_element (output_set->h,
1913 di->element,
1914 apply_diff_progress,
1915 progress_cls));
1916 }
1917 }
1918 1750
1919 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter); 1751 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1920 1752
@@ -1922,7 +1754,7 @@ task_start_apply_diff (struct TaskEntry *task)
1922 { 1754 {
1923 // call closure right now, no pending ops 1755 // call closure right now, no pending ops
1924 GNUNET_free (progress_cls); 1756 GNUNET_free (progress_cls);
1925 apply_diff_done (task); 1757 finish_task (task);
1926 } 1758 }
1927} 1759}
1928 1760
@@ -2365,7 +2197,6 @@ step_depend_on (struct Step *step, struct Step *dep)
2365 GNUNET_assert (step != dep); 2197 GNUNET_assert (step != dep);
2366 GNUNET_assert (NULL != step); 2198 GNUNET_assert (NULL != step);
2367 GNUNET_assert (NULL != dep); 2199 GNUNET_assert (NULL != dep);
2368 // XXX: make rounds work
2369 GNUNET_assert (dep->round <= step->round); 2200 GNUNET_assert (dep->round <= step->round);
2370 2201
2371#ifdef GNUNET_EXTRA_LOGGING 2202#ifdef GNUNET_EXTRA_LOGGING
@@ -2423,8 +2254,6 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2423 struct Step *step_after) 2254 struct Step *step_after)
2424{ 2255{
2425 uint16_t n = session->num_peers; 2256 uint16_t n = session->num_peers;
2426 uint16_t t = n / 3;
2427
2428 uint16_t me = session->local_peer_idx; 2257 uint16_t me = session->local_peer_idx;
2429 2258
2430 uint16_t p1; 2259 uint16_t p1;
@@ -2451,8 +2280,6 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2451#endif 2280#endif
2452 step_depend_on (step, step_before); 2281 step_depend_on (step, step_before);
2453 2282
2454 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: Considering leader %d\n", session->local_peer_idx, lead);
2455
2456 if (lead == me) 2283 if (lead == me)
2457 { 2284 {
2458 for (k = 0; k < n; k++) 2285 for (k = 0; k < n; k++)
@@ -2462,7 +2289,6 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2462 p1 = me; 2289 p1 = me;
2463 p2 = k; 2290 p2 = k;
2464 arrange_peers (&p1, &p2, n); 2291 arrange_peers (&p1, &p2, n);
2465 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead);
2466 task = ((struct TaskEntry) { 2292 task = ((struct TaskEntry) {
2467 .step = step, 2293 .step = step,
2468 .start = task_start_reconcile, 2294 .start = task_start_reconcile,
@@ -2492,7 +2318,6 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2492 p1 = me; 2318 p1 = me;
2493 p2 = lead; 2319 p2 = lead;
2494 arrange_peers (&p1, &p2, n); 2320 arrange_peers (&p1, &p2, n);
2495 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead);
2496 task = ((struct TaskEntry) { 2321 task = ((struct TaskEntry) {
2497 .step = step, 2322 .step = step,
2498 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead}, 2323 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
@@ -2542,12 +2367,8 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2542 task = ((struct TaskEntry) { 2367 task = ((struct TaskEntry) {
2543 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead }, 2368 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2544 .step = step, 2369 .step = step,
2545 .start = task_start_eval_rfn 2370 .start = task_start_eval_echo
2546 }); 2371 });
2547 task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead },
2548 task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
2549 task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
2550 task.cls.eval_rfn.threshold = n - t,
2551 put_task (session->taskmap, &task); 2372 put_task (session->taskmap, &task);
2552 2373
2553 prev_step = step; 2374 prev_step = step;
@@ -2572,6 +2393,11 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2572 }); 2393 });
2573 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead }; 2394 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2574 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead }; 2395 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2396 /* If there was at least one element in the echo round that was
2397 contested (i.e. it had no n-t majority), then we let the other peers
2398 know, and other peers let us know. The contested flag for each peer is
2399 stored in the rfn. */
2400 task.cls.setop.transceive_contested = GNUNET_YES;
2575 put_task (session->taskmap, &task); 2401 put_task (session->taskmap, &task);
2576 } 2402 }
2577 2403
@@ -2583,35 +2409,11 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2583#endif 2409#endif
2584 step_depend_on (step, prev_step); 2410 step_depend_on (step, prev_step);
2585 2411
2586 // evaluate ConfirmationReferendum and
2587 // apply it to the LeaderReferendum
2588 // XXX: the diff should contain grading information
2589 task = ((struct TaskEntry) { 2412 task = ((struct TaskEntry) {
2590 .step = step, 2413 .step = step,
2591 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead }, 2414 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2592 .start = task_start_eval_rfn, 2415 .start = task_start_grade,
2593 });
2594 task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2595 task.cls.eval_rfn.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_CONSENSUS, rep, lead };
2596 put_task (session->taskmap, &task);
2597
2598
2599 prev_step = step;
2600 /* Same round, since step only has local tasks */
2601 step = create_step (session, round);
2602#ifdef GNUNET_EXTRA_LOGGING
2603 GNUNET_asprintf (&step->debug_name, "gc apply, lead %u rep %u", lead, rep);
2604#endif
2605 step_depend_on (step, prev_step);
2606
2607 task = ((struct TaskEntry) {
2608 .step = step,
2609 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, rep, lead },
2610 .start = task_start_leader_apply,
2611 }); 2416 });
2612 task.cls.leader_apply.input_diff_1 = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2613 task.cls.leader_apply.input_diff_2 = (struct DiffKey) { DIFF_KIND_LEADER_CONSENSUS, rep, lead };
2614 task.cls.leader_apply.output_rfn = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, rep };
2615 put_task (session->taskmap, &task); 2417 put_task (session->taskmap, &task);
2616 2418
2617 step_depend_on (step_after, step); 2419 step_depend_on (step_after, step);
@@ -2707,16 +2509,13 @@ construct_task_graph (struct ConsensusSession *session)
2707 for (lead = 0; lead < n; lead++) 2509 for (lead = 0; lead < n; lead++)
2708 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end); 2510 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2709 2511
2710 // TODO: add peers to ignore list, either here or 2512 // TODO: add peers to blacklisted list, either here or
2711 // already in the gradecast. 2513 // already in the gradecast.
2712 task = ((struct TaskEntry) { 2514 task = ((struct TaskEntry) {
2713 .step = step_rep_end, 2515 .step = step_rep_end,
2714 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1}, 2516 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2715 .start = task_start_eval_rfn, 2517 .start = task_start_apply_round,
2716 }); 2518 });
2717 task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_CURRENT, i };
2718 task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, i };
2719 task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 };
2720 put_task (session->taskmap, &task); 2519 put_task (session->taskmap, &task);
2721 2520
2722 prev_step = step_rep_end; 2521 prev_step = step_rep_end;
@@ -2804,7 +2603,7 @@ initialize_session (struct ConsensusSession *session,
2804 put_set (session, client_set); 2603 put_set (session, client_set);
2805 } 2604 }
2806 2605
2807 session->peers_ignored = GNUNET_new_array (session->num_peers, int); 2606 session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2808 2607
2809 /* Just construct the task graph, 2608 /* Just construct the task graph,
2810 but don't run anything until the client calls conclude. */ 2609 but don't run anything until the client calls conclude. */