aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2015-10-05 21:26:56 +0000
committerFlorian Dold <florian.dold@gmail.com>2015-10-05 21:26:56 +0000
commita11e17158609766f29c40de6daecf5ce02b38e6c (patch)
tree1fbce1a057d5e74415781074a429d53751b43111 /src/consensus/gnunet-service-consensus.c
parent4dcb414e2faabc800577c25dec3b63e3ceaaa84b (diff)
downloadgnunet-a11e17158609766f29c40de6daecf5ce02b38e6c.tar.gz
gnunet-a11e17158609766f29c40de6daecf5ce02b38e6c.zip
work on consensus and set
- evil peers for consensus - various fixes for consensus and set
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c986
1 files changed, 782 insertions, 204 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 424be7a71..5a2f62cd5 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -128,40 +128,50 @@ enum PhaseKind
128 PHASE_KIND_GRADECAST_CONFIRM, 128 PHASE_KIND_GRADECAST_CONFIRM,
129 PHASE_KIND_GRADECAST_CONFIRM_GRADE, 129 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
130 PHASE_KIND_GRADECAST_APPLY_RESULT, 130 PHASE_KIND_GRADECAST_APPLY_RESULT,
131 /**
132 * Apply a repetition of the all-to-all
133 * gradecast to the current set.
134 */
135 PHASE_KIND_APPLY_REP,
131 PHASE_KIND_FINISH, 136 PHASE_KIND_FINISH,
132}; 137};
133 138
134 139
135enum ActionType 140enum TaskKind
136{ 141{
137 /** 142 /**
138 * Do a set reconciliation with another peer (or via looback). 143 * Do a set reconciliation with another peer (or via looback).
139 */ 144 */
140 ACTION_RECONCILE, 145 TASK_RECONCILE,
146 /**
147 * Same as reconciliation, but only care about added elements.
148 */
149 TASK_UNION,
141 /** 150 /**
142 * Apply a referendum with a threshold 151 * Apply a referendum with a threshold
143 * to a set and/or a diff. 152 * to a set and/or a diff.
144 */ 153 */
145 ACTION_EVAL_RFN, 154 TASK_EVAL_RFN,
146 /** 155 /**
147 * Apply a diff to a set. 156 * Apply a diff to a set.
148 */ 157 */
149 ACTION_APPLY_DIFF, 158 TASK_APPLY_DIFF,
150 ACTION_FINISH, 159 FASK_FINISH,
151}; 160};
152 161
153enum SetKind 162enum SetKind
154{ 163{
155 SET_KIND_NONE = 0, 164 SET_KIND_NONE = 0,
156 SET_KIND_CURRENT, 165 SET_KIND_CURRENT,
157 SET_KIND_LEADER, 166 SET_KIND_LEADER_PROPOSAL,
158 SET_KIND_ECHO_RESULT, 167 SET_KIND_ECHO_RESULT,
159}; 168};
160 169
161enum DiffKind 170enum DiffKind
162{ 171{
163 DIFF_KIND_NONE = 0, 172 DIFF_KIND_NONE = 0,
164 DIFF_KIND_LEADER, 173 DIFF_KIND_LEADER_PROPOSAL,
174 DIFF_KIND_LEADER_CONSENSUS,
165 DIFF_KIND_GRADECAST_RESULT, 175 DIFF_KIND_GRADECAST_RESULT,
166}; 176};
167 177
@@ -170,9 +180,74 @@ enum RfnKind
170 RFN_KIND_NONE = 0, 180 RFN_KIND_NONE = 0,
171 RFN_KIND_ECHO, 181 RFN_KIND_ECHO,
172 RFN_KIND_CONFIRM, 182 RFN_KIND_CONFIRM,
183 RFN_KIND_GRADECAST_RESULT
173}; 184};
174 185
175 186
187struct SetOpCls
188{
189 struct SetKey input_set;
190
191 struct SetKey output_set;
192 struct RfnKey output_rfn;
193 struct DiffKey output_diff;
194
195 int do_not_remove;
196
197 struct GNUNET_SET_OperationHandle *op;
198};
199
200struct EvalRfnCls
201{
202 struct SetKey input_set;
203 struct RfnKey input_rfn;
204
205 uint16_t threshold;
206
207 struct SetKey output_set;
208 struct DiffKey output_diff;
209};
210
211
212struct ApplyDiffCls
213{
214 struct SetKey input_set;
215 struct DiffKey input_diff;
216 struct SetKey output_set;
217};
218
219
220struct LeaderApplyCls
221{
222 struct DiffKey input_diff_1;
223 struct DiffKey input_diff_2;
224
225 struct RfnKey output_rfn;
226};
227
228
229struct FinishCls
230{
231 struct SetKey input_set;
232};
233
234/**
235 * Closure for both @a start_task
236 * and @a cancel_task.
237 */
238union TaskFuncCls
239{
240 struct SetOpCls setop;
241 struct EvalRfnCls eval_rfn;
242 struct ApplyDiffCls apply_diff;
243 struct LeaderApplyCls leader_apply;
244 struct FinishCls finish;
245};
246
247struct TaskEntry;
248
249typedef void (*TaskFunc) (struct TaskEntry *task);
250
176/* 251/*
177 * Node in the consensus task graph. 252 * Node in the consensus task graph.
178 */ 253 */
@@ -182,30 +257,16 @@ struct TaskEntry
182 257
183 struct Step *step; 258 struct Step *step;
184 259
185 int is_running; 260 int is_started;
186 261
187 int is_finished; 262 int is_finished;
188 263
189 enum ActionType action; 264 enum TaskKind kind;
190
191 struct SetKey input_set;
192 struct DiffKey input_diff;
193 struct RfnKey input_rfn;
194 struct SetKey output_set;
195 struct DiffKey output_diff;
196 struct RfnKey output_rfn;
197
198 /**
199 * Threshold when evaluating referendums.
200 */
201 uint16_t threshold;
202 265
203 /** 266 TaskFunc start;
204 * Operation that is running for this task. 267 TaskFunc cancel;
205 */
206 struct GNUNET_SET_OperationHandle *op;
207 268
208 struct GNUNET_SET_Handle *commited_set; 269 union TaskFuncCls cls;
209}; 270};
210 271
211 272
@@ -280,17 +341,10 @@ struct Step
280 char *debug_name; 341 char *debug_name;
281}; 342};
282 343
283struct RfnPeerInfo
284{
285 /* Peers can propose changes,
286 * but they are only accepted once
287 * the whole set operation is done. */
288 int is_commited;
289};
290 344
291struct RfnElementInfo 345struct RfnElementInfo
292{ 346{
293 struct GNUNET_SET_Element *element; 347 const struct GNUNET_SET_Element *element;
294 348
295 /* 349 /*
296 * Vote (or VOTE_NONE) from every peer 350 * Vote (or VOTE_NONE) from every peer
@@ -323,12 +377,20 @@ struct ReferendumEntry
323 * not counted for majority votes or thresholds. 377 * not counted for majority votes or thresholds.
324 */ 378 */
325 int *peer_commited; 379 int *peer_commited;
380
381
382 /**
383 * Contestation state of the peer. If a peer is contested, the values it
384 * contributed are still counted for applying changes, but the grading is
385 * affected.
386 */
387 int *peer_contested;
326}; 388};
327 389
328 390
329struct DiffElementInfo 391struct DiffElementInfo
330{ 392{
331 struct GNUNET_SET_Element *element; 393 const struct GNUNET_SET_Element *element;
332 394
333 /** 395 /**
334 * Positive weight for 'add', negative 396 * Positive weight for 'add', negative
@@ -468,13 +530,13 @@ static void
468finish_task (struct TaskEntry *task); 530finish_task (struct TaskEntry *task);
469 531
470static void 532static void
471run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task); 533task_start_reconcile (struct TaskEntry *task);
472 534
473static void 535static void
474run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task); 536task_start_eval_rfn (struct TaskEntry *task);
475 537
476static void 538static void
477run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task); 539task_start_apply_diff (struct TaskEntry *task);
478 540
479static void 541static void
480run_ready_steps (struct ConsensusSession *session); 542run_ready_steps (struct ConsensusSession *session);
@@ -492,6 +554,7 @@ phasename (uint16_t phase)
492 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM"; 554 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
493 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE"; 555 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
494 case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT"; 556 case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
557 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
495 default: return "(unknown)"; 558 default: return "(unknown)";
496 } 559 }
497} 560}
@@ -503,7 +566,7 @@ setname (uint16_t kind)
503 switch (kind) 566 switch (kind)
504 { 567 {
505 case SET_KIND_CURRENT: return "CURRENT"; 568 case SET_KIND_CURRENT: return "CURRENT";
506 case SET_KIND_LEADER: return "LEADER"; 569 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
507 case SET_KIND_NONE: return "NONE"; 570 case SET_KIND_NONE: return "NONE";
508 default: return "(unknown)"; 571 default: return "(unknown)";
509 } 572 }
@@ -527,12 +590,26 @@ diffname (uint16_t kind)
527 switch (kind) 590 switch (kind)
528 { 591 {
529 case DIFF_KIND_NONE: return "NONE"; 592 case DIFF_KIND_NONE: return "NONE";
530 case DIFF_KIND_LEADER: return "LEADER"; 593 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
531 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT"; 594 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
595 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
532 default: return "(unknown)"; 596 default: return "(unknown)";
533 } 597 }
534} 598}
535 599
600#ifdef GNUNET_EXTRA_LOGGING
601
602
603static const char *
604debug_str_element (const struct GNUNET_SET_Element *el)
605{
606 struct GNUNET_HashCode hash;
607
608 GNUNET_SET_element_hash (el, &hash);
609
610 return GNUNET_h2s (&hash);
611}
612
536static const char * 613static const char *
537debug_str_task_key (struct TaskKey *tk) 614debug_str_task_key (struct TaskKey *tk)
538{ 615{
@@ -583,6 +660,8 @@ debug_str_rfn_key (struct RfnKey *rk)
583 return buf; 660 return buf;
584} 661}
585 662
663#endif /* GNUNET_EXTRA_LOGGING */
664
586 665
587/** 666/**
588 * Destroy a session, free all resources associated with it. 667 * Destroy a session, free all resources associated with it.
@@ -602,6 +681,8 @@ destroy_session (struct ConsensusSession *session)
602 { 681 {
603 GNUNET_MQ_destroy (session->client_mq); 682 GNUNET_MQ_destroy (session->client_mq);
604 session->client_mq = NULL; 683 session->client_mq = NULL;
684 /* The MQ cleanup will also disconnect the underlying client. */
685 session->client = NULL;
605 } 686 }
606 if (NULL != session->client) 687 if (NULL != session->client)
607 { 688 {
@@ -634,8 +715,9 @@ send_to_client_iter (void *cls,
634 struct GNUNET_CONSENSUS_ElementMessage *m; 715 struct GNUNET_CONSENSUS_ElementMessage *m;
635 716
636 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
637 "P%d: got element for client\n", 718 "P%d: sending element %s to client\n",
638 session->local_peer_idx); 719 session->local_peer_idx,
720 debug_str_element (element));
639 721
640 ev = GNUNET_MQ_msg_extra (m, element->size, 722 ev = GNUNET_MQ_msg_extra (m, element->size,
641 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); 723 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
@@ -736,7 +818,36 @@ diff_insert (struct DiffEntry *diff,
736 int weight, 818 int weight,
737 const struct GNUNET_SET_Element *element) 819 const struct GNUNET_SET_Element *element)
738{ 820{
739 GNUNET_assert (0); 821 struct DiffElementInfo *di;
822 struct GNUNET_HashCode hash;
823
824 GNUNET_assert ( (1 == weight) || (-1 == weight));
825
826 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
827 "diff_insert with element size %u\n",
828 element->size);
829
830 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
831 "hashing element\n");
832
833 GNUNET_SET_element_hash (element, &hash);
834
835 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836 "hashed element\n");
837
838 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
839
840 if (NULL == di)
841 {
842 di = GNUNET_new (struct DiffElementInfo);
843 di->element = GNUNET_SET_element_dup (element);
844 GNUNET_assert (GNUNET_OK ==
845 GNUNET_CONTAINER_multihashmap_put (diff->changes,
846 &hash, di,
847 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
848 }
849
850 di->weight = weight;
740} 851}
741 852
742 853
@@ -747,10 +858,39 @@ rfn_vote (struct ReferendumEntry *rfn,
747 int vote, 858 int vote,
748 const struct GNUNET_SET_Element *element) 859 const struct GNUNET_SET_Element *element)
749{ 860{
861 struct RfnElementInfo *ri;
862 struct GNUNET_HashCode hash;
863
750 GNUNET_assert (voting_peer < num_peers); 864 GNUNET_assert (voting_peer < num_peers);
751 GNUNET_assert (0); 865
866 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867 "voting for element of size %u\n",
868 element->size);
869
870 rfn->peer_commited[voting_peer] = GNUNET_YES;
871
872 GNUNET_SET_element_hash (element, &hash);
873 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
874
875
876 if (NULL == ri)
877 {
878 ri = GNUNET_new (struct RfnElementInfo);
879 ri->element = GNUNET_SET_element_dup (element);
880 ri->votes = GNUNET_new_array (num_peers, int);
881 GNUNET_assert (GNUNET_OK ==
882 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
883 &hash, ri,
884 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
885 }
886
887 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
888 "rfn vote element %p\n",
889 ri->element);
890 ri->votes[voting_peer] = vote;
752} 891}
753 892
893
754uint16_t 894uint16_t
755task_other_peer (struct TaskEntry *task) 895task_other_peer (struct TaskEntry *task)
756{ 896{
@@ -760,6 +900,7 @@ task_other_peer (struct TaskEntry *task)
760 return task->key.peer1; 900 return task->key.peer1;
761} 901}
762 902
903
763/** 904/**
764 * Callback for set operation results. Called for each element 905 * Callback for set operation results. Called for each element
765 * in the result set. 906 * in the result set.
@@ -779,6 +920,10 @@ set_result_cb (void *cls,
779 struct DiffEntry *output_diff = NULL; 920 struct DiffEntry *output_diff = NULL;
780 struct ReferendumEntry *output_rfn = NULL; 921 struct ReferendumEntry *output_rfn = NULL;
781 unsigned int other_idx; 922 unsigned int other_idx;
923 struct SetOpCls *setop;
924
925 setop = &task->cls.setop;
926
782 927
783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 928 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784 "P%u: got set result for {%s}, status %u\n", 929 "P%u: got set result for {%s}, status %u\n",
@@ -786,7 +931,7 @@ set_result_cb (void *cls,
786 debug_str_task_key (&task->key), 931 debug_str_task_key (&task->key),
787 status); 932 status);
788 933
789 if (GNUNET_NO == task->is_running) 934 if (GNUNET_NO == task->is_started)
790 { 935 {
791 GNUNET_break_op (0); 936 GNUNET_break_op (0);
792 return; 937 return;
@@ -808,14 +953,23 @@ set_result_cb (void *cls,
808 GNUNET_assert (0); 953 GNUNET_assert (0);
809 } 954 }
810 955
811 if (SET_KIND_NONE != task->output_set.set_kind) 956 if (SET_KIND_NONE != setop->output_set.set_kind)
812 output_set = lookup_set (session, &task->output_set); 957 {
958 output_set = lookup_set (session, &setop->output_set);
959 GNUNET_assert (NULL != output_set);
960 }
813 961
814 if (DIFF_KIND_NONE != task->output_diff.diff_kind) 962 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
815 output_diff = lookup_diff (session, &task->output_diff); 963 {
964 output_diff = lookup_diff (session, &setop->output_diff);
965 GNUNET_assert (NULL != output_diff);
966 }
816 967
817 if (RFN_KIND_NONE != task->output_rfn.rfn_kind) 968 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
818 output_rfn = lookup_rfn (session, &task->output_rfn); 969 {
970 output_rfn = lookup_rfn (session, &setop->output_rfn);
971 GNUNET_assert (NULL != output_rfn);
972 }
819 973
820 if (GNUNET_YES == session->peers_ignored[other_idx]) 974 if (GNUNET_YES == session->peers_ignored[other_idx])
821 { 975 {
@@ -827,8 +981,10 @@ set_result_cb (void *cls,
827 981
828 switch (status) 982 switch (status)
829 { 983 {
830 // case GNUNET_SET_STATUS_MISSING_LOCAL: 984 case GNUNET_SET_STATUS_ADD_LOCAL:
831 case GNUNET_SET_STATUS_OK: 985 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
986 "Adding element in Task {%s}\n",
987 debug_str_task_key (&task->key));
832 if (NULL != output_set) 988 if (NULL != output_set)
833 { 989 {
834 // FIXME: record pending adds, use callback 990 // FIXME: record pending adds, use callback
@@ -836,25 +992,95 @@ set_result_cb (void *cls,
836 element, 992 element,
837 NULL, 993 NULL,
838 NULL); 994 NULL);
839 995#ifdef GNUNET_EXTRA_LOGGING
996 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
997 "P%u: adding element %s into set {%s} of task {%s}\n",
998 session->local_peer_idx,
999 debug_str_element (element),
1000 debug_str_set_key (&setop->output_set),
1001 debug_str_task_key (&task->key));
1002#endif
840 } 1003 }
841 if (NULL != output_diff) 1004 if (NULL != output_diff)
842 { 1005 {
843 diff_insert (output_diff, 1, element); 1006 diff_insert (output_diff, 1, element);
1007#ifdef GNUNET_EXTRA_LOGGING
1008 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1009 "P%u: adding element %s into diff {%s} of task {%s}\n",
1010 session->local_peer_idx,
1011 debug_str_element (element),
1012 debug_str_diff_key (&setop->output_diff),
1013 debug_str_task_key (&task->key));
1014#endif
844 } 1015 }
845 if (NULL != output_rfn) 1016 if (NULL != output_rfn)
846 { 1017 {
847 rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element); 1018 rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element);
1019#ifdef GNUNET_EXTRA_LOGGING
1020 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1021 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1022 session->local_peer_idx,
1023 debug_str_element (element),
1024 debug_str_rfn_key (&setop->output_rfn),
1025 debug_str_task_key (&task->key));
1026#endif
848 } 1027 }
849 // XXX: add result to structures in task 1028 // XXX: add result to structures in task
850 break; 1029 break;
851 //case GNUNET_SET_STATUS_MISSING_REMOTE: 1030 case GNUNET_SET_STATUS_ADD_REMOTE:
852 // // XXX: add result to structures in task 1031 if (GNUNET_YES == setop->do_not_remove)
853 // break; 1032 break;
1033 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1034 "Removing element in Task {%s}\n",
1035 debug_str_task_key (&task->key));
1036 if (NULL != output_set)
1037 {
1038 // FIXME: record pending adds, use callback
1039 GNUNET_SET_remove_element (output_set->h,
1040 element,
1041 NULL,
1042 NULL);
1043#ifdef GNUNET_EXTRA_LOGGING
1044 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1045 "P%u: removing element %s from set {%s} of task {%s}\n",
1046 session->local_peer_idx,
1047 debug_str_element (element),
1048 debug_str_set_key (&setop->output_set),
1049 debug_str_task_key (&task->key));
1050#endif
1051 }
1052 if (NULL != output_diff)
1053 {
1054 diff_insert (output_diff, -1, element);
1055#ifdef GNUNET_EXTRA_LOGGING
1056 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1057 "P%u: removing element %s from diff {%s} of task {%s}\n",
1058 session->local_peer_idx,
1059 debug_str_element (element),
1060 debug_str_diff_key (&setop->output_diff),
1061 debug_str_task_key (&task->key));
1062#endif
1063 }
1064 if (NULL != output_rfn)
1065 {
1066 rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_REMOVE, element);
1067#ifdef GNUNET_EXTRA_LOGGING
1068 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1069 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1070 session->local_peer_idx,
1071 debug_str_element (element),
1072 debug_str_rfn_key (&setop->output_rfn),
1073 debug_str_task_key (&task->key));
1074#endif
1075 }
1076 break;
854 case GNUNET_SET_STATUS_DONE: 1077 case GNUNET_SET_STATUS_DONE:
855 // XXX: check first if any changes to the underlying 1078 // XXX: check first if any changes to the underlying
856 // set are still pending 1079 // set are still pending
857 // XXX: commit other peer in referendum 1080 // XXX: commit other peer in referendum
1081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082 "Finishing setop in Task {%s}\n",
1083 debug_str_task_key (&task->key));
858 finish_task (task); 1084 finish_task (task);
859 break; 1085 break;
860 case GNUNET_SET_STATUS_FAILURE: 1086 case GNUNET_SET_STATUS_FAILURE:
@@ -867,6 +1093,84 @@ set_result_cb (void *cls,
867 } 1093 }
868} 1094}
869 1095
1096#ifdef EVIL
1097
1098enum Evilness
1099{
1100 EVILNESS_NONE,
1101 EVILNESS_CRAM,
1102 EVILNESS_SLACK,
1103};
1104
1105static void
1106get_evilness (struct ConsensusSession *session, enum Evilness *ret_type, unsigned int *ret_num)
1107{
1108 char *evil_spec;
1109 char *field;
1110 char *evil_type_str = NULL;
1111
1112 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1113 {
1114 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115 "P%u: no evilness\n",
1116 session->local_peer_idx);
1117 *ret_type = EVILNESS_NONE;
1118 return;
1119 }
1120 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121 "P%u: got evilness spec\n",
1122 session->local_peer_idx);
1123
1124 for (field = strtok (evil_spec, "/");
1125 NULL != field;
1126 field = strtok (NULL, "/"))
1127 {
1128 unsigned int peer_num;
1129 unsigned int evil_num;
1130 int ret;
1131
1132 evil_type_str = NULL;
1133
1134 ret = sscanf (field, "%u;%m[a-z];%u", &peer_num, &evil_type_str, &evil_num);
1135
1136 if (ret != 3)
1137 {
1138 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC, behaving like a good peer.\n",
1139 field);
1140 goto not_evil;
1141 }
1142
1143 GNUNET_assert (NULL != evil_type_str);
1144
1145 if (peer_num == session->local_peer_idx)
1146 {
1147 if (0 == strcmp ("slack", evil_type_str))
1148 *ret_type = EVILNESS_SLACK;
1149 else if (0 == strcmp ("cram", evil_type_str))
1150 {
1151 *ret_type = EVILNESS_CRAM;
1152 *ret_num = evil_num;
1153 }
1154 else
1155 {
1156 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n");
1157 goto not_evil;
1158 }
1159 goto cleanup;
1160 }
1161 /* No GNUNET_free since memory was allocated by libc */
1162 free (evil_type_str);
1163 evil_type_str = NULL;
1164 }
1165not_evil:
1166 *ret_type = EVILNESS_NONE;
1167cleanup:
1168 GNUNET_free (evil_spec);
1169 if (NULL != evil_type_str)
1170 free (evil_type_str);
1171}
1172
1173#endif
870 1174
871 1175
872/** 1176/**
@@ -878,11 +1182,67 @@ commit_set (struct ConsensusSession *session,
878 struct TaskEntry *task) 1182 struct TaskEntry *task)
879{ 1183{
880 struct SetEntry *set; 1184 struct SetEntry *set;
1185 struct SetOpCls *setop = &task->cls.setop;
881 1186
882 GNUNET_assert (NULL != task->op); 1187 GNUNET_assert (NULL != setop->op);
883 set = lookup_set (session, &task->input_set); 1188 set = lookup_set (session, &setop->input_set);
884 GNUNET_assert (NULL != set); 1189 GNUNET_assert (NULL != set);
885 GNUNET_SET_commit (task->op, set->h); 1190
1191#ifdef EVIL
1192 {
1193 unsigned int i;
1194 unsigned int evil_num;
1195 enum Evilness evilness;
1196
1197 get_evilness (session, &evilness, &evil_num);
1198 switch (evilness)
1199 {
1200 case EVILNESS_CRAM:
1201 /* We're not cramming elements in the
1202 all-to-all round, since that would just
1203 add more elements to the result set, but
1204 wouldn't test robustness. */
1205 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1206 {
1207 GNUNET_SET_commit (setop->op, set->h);
1208 break;
1209 }
1210 for (i = 0; i < evil_num; i++)
1211 {
1212 struct GNUNET_HashCode hash;
1213 struct GNUNET_SET_Element element;
1214 element.data = &hash;
1215 element.size = sizeof (struct GNUNET_HashCode);
1216 element.element_type = 0;
1217
1218 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
1219 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1220#ifdef GNUNET_EXTRA_LOGGING
1221 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1222 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1223 session->local_peer_idx,
1224 debug_str_element (&element),
1225 debug_str_set_key (&setop->input_set),
1226 debug_str_task_key (&task->key));
1227#endif
1228 }
1229 GNUNET_SET_commit (setop->op, set->h);
1230 break;
1231 case EVILNESS_SLACK:
1232 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1233 "P%u: evil peer: slacking\n",
1234 session->local_peer_idx,
1235 evil_num);
1236 /* Do nothing. */
1237 break;
1238 case EVILNESS_NONE:
1239 GNUNET_SET_commit (setop->op, set->h);
1240 break;
1241 }
1242 }
1243#else
1244 GNUNET_SET_commit (setop->op, set->h);
1245#endif
886} 1246}
887 1247
888 1248
@@ -892,6 +1252,8 @@ put_diff (struct ConsensusSession *session,
892{ 1252{
893 struct GNUNET_HashCode hash; 1253 struct GNUNET_HashCode hash;
894 1254
1255 GNUNET_assert (NULL != diff);
1256
895 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash); 1257 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
896 GNUNET_assert (GNUNET_OK == 1258 GNUNET_assert (GNUNET_OK ==
897 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff, 1259 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
@@ -906,6 +1268,10 @@ put_set (struct ConsensusSession *session,
906 1268
907 GNUNET_assert (NULL != set->h); 1269 GNUNET_assert (NULL != set->h);
908 1270
1271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1272 "Putting set %s\n",
1273 debug_str_set_key (&set->key));
1274
909 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash); 1275 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
910 GNUNET_assert (GNUNET_OK == 1276 GNUNET_assert (GNUNET_OK ==
911 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set, 1277 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
@@ -931,26 +1297,183 @@ static void
931output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy) 1297output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
932{ 1298{
933 struct TaskEntry *task = (struct TaskEntry *) cls; 1299 struct TaskEntry *task = (struct TaskEntry *) cls;
1300 struct SetOpCls *setop = &task->cls.setop;
934 struct ConsensusSession *session = task->step->session; 1301 struct ConsensusSession *session = task->step->session;
935 struct SetEntry *set = GNUNET_new (struct SetEntry); 1302 struct SetEntry *set = GNUNET_new (struct SetEntry);
936 1303
937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1304 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938 "P%u: Received lazy copy, storing output set %s\n", 1305 "P%u: Received lazy copy, storing output set %s\n",
939 session->local_peer_idx, debug_str_set_key (&task->output_set)); 1306 session->local_peer_idx, debug_str_set_key (&setop->output_set));
940 1307
941 set->key = task->output_set; 1308 set->key = setop->output_set;
942 set->h = copy; 1309 set->h = copy;
943 put_set (task->step->session, set); 1310 put_set (task->step->session, set);
944 run_task_remote_union (task->step->session, task); 1311 task_start_reconcile (task);
945} 1312}
946 1313
947 1314
948static void 1315static void
949run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task) 1316task_cancel_reconcile (struct TaskEntry *task)
1317{
1318 /* not implemented yet */
1319 GNUNET_assert (0);
1320}
1321
1322
1323static void
1324apply_diff_to_rfn (struct DiffEntry *diff,
1325 struct ReferendumEntry *rfn,
1326 uint16_t voting_peer,
1327 uint16_t num_peers)
1328{
1329 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1330 struct DiffElementInfo *di;
1331
1332 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1333
1334 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1335 {
1336 if (di->weight > 0)
1337 {
1338 rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element);
1339 }
1340 if (di->weight < 0)
1341 {
1342 rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element);
1343 }
1344 }
1345
1346 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1347}
1348
1349
1350struct DiffEntry *
1351diff_create ()
1352{
1353 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1354
1355 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1356
1357 return d;
1358}
1359
1360
1361struct DiffEntry *
1362diff_compose (struct DiffEntry *diff_1,
1363 struct DiffEntry *diff_2)
1364{
1365 struct DiffEntry *diff_new;
1366 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1367 struct DiffElementInfo *di;
1368
1369 diff_new = diff_create ();
1370
1371 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1372 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1373 {
1374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1375 "iterating first diff\n");
1376 diff_insert (diff_new, di->weight, di->element);
1377 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1378 "insert done\n");
1379 }
1380 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1381
1382 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1383 "iterating first diff done\n");
1384
1385 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1386 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1387 {
1388 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1389 "iterating second diff\n");
1390 diff_insert (diff_new, di->weight, di->element);
1391 }
1392 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1393
1394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395 "iterating second diff done\n");
1396
1397 return diff_new;
1398}
1399
1400
1401struct ReferendumEntry *
1402rfn_create (uint16_t size)
1403{
1404 struct ReferendumEntry *rfn;
1405
1406 rfn = GNUNET_new (struct ReferendumEntry);
1407 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1408 rfn->peer_commited = GNUNET_new_array (size, int);
1409
1410 return rfn;
1411}
1412
1413
1414static void
1415diff_destroy (struct DiffEntry *diff)
1416{
1417 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1418 GNUNET_free (diff);
1419}
1420
1421
1422static void
1423task_start_leader_apply (struct TaskEntry *task)
1424{
1425 struct LeaderApplyCls *lacls = &task->cls.leader_apply;
1426 struct ConsensusSession *session = task->step->session;
1427 struct DiffEntry *diff_1;
1428 struct DiffEntry *diff_2;
1429 struct DiffEntry *diff_combined;
1430 struct ReferendumEntry *rfn;
1431
1432 diff_1 = lookup_diff (session, &lacls->input_diff_1);
1433 GNUNET_assert (NULL != diff_1);
1434
1435 diff_2 = lookup_diff (session, &lacls->input_diff_2);
1436 GNUNET_assert (NULL != diff_2);
1437
1438 rfn = lookup_rfn (session, &lacls->output_rfn);
1439
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441 "looked up everything\n");
1442
1443 if (NULL == rfn)
1444 {
1445 rfn = rfn_create (session->num_peers);
1446 rfn->key = lacls->output_rfn;
1447 put_rfn (session, rfn);
1448 }
1449
1450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1451 "ensured rfn\n");
1452
1453 diff_combined = diff_compose (diff_1, diff_2);
1454
1455 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1456 "composed diffs\n");
1457
1458 apply_diff_to_rfn (diff_combined, rfn, task->key.leader, session->num_peers);
1459
1460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1461 "applied diffs to rfns\n");
1462
1463 diff_destroy (diff_combined);
1464
1465 finish_task (task);
1466}
1467
1468
1469static void
1470task_start_reconcile (struct TaskEntry *task)
950{ 1471{
951 struct SetEntry *input; 1472 struct SetEntry *input;
1473 struct SetOpCls *setop = &task->cls.setop;
1474 struct ConsensusSession *session = task->step->session;
952 1475
953 input = lookup_set (session, &task->input_set); 1476 input = lookup_set (session, &setop->input_set);
954 GNUNET_assert (NULL != input); 1477 GNUNET_assert (NULL != input);
955 GNUNET_assert (NULL != input->h); 1478 GNUNET_assert (NULL != input->h);
956 1479
@@ -959,11 +1482,11 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
959 because we want something valid in there, even 1482 because we want something valid in there, even
960 if the other peer doesn't talk to us */ 1483 if the other peer doesn't talk to us */
961 1484
962 if (SET_KIND_NONE != task->output_set.set_kind) 1485 if (SET_KIND_NONE != setop->output_set.set_kind)
963 { 1486 {
964 /* If we don't have an existing output set, 1487 /* If we don't have an existing output set,
965 we clone the input set. */ 1488 we clone the input set. */
966 if (NULL == lookup_set (session, &task->output_set)) 1489 if (NULL == lookup_set (session, &setop->output_set))
967 { 1490 {
968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969 "Output set missing, copying from input set\n"); 1492 "Output set missing, copying from input set\n");
@@ -975,25 +1498,35 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
975 } 1498 }
976 } 1499 }
977 1500
978 if (RFN_KIND_NONE != task->output_rfn.rfn_kind) 1501 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
979 { 1502 {
980 if (NULL == lookup_rfn (session, &task->output_rfn)) 1503 if (NULL == lookup_rfn (session, &setop->output_rfn))
981 { 1504 {
982 struct ReferendumEntry *rfn; 1505 struct ReferendumEntry *rfn;
983 1506
984 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1507 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985 "P%u: output rfn <%s> missing, creating.\n", 1508 "P%u: output rfn <%s> missing, creating.\n",
986 session->local_peer_idx, 1509 session->local_peer_idx,
987 debug_str_rfn_key (&task->output_rfn)); 1510 debug_str_rfn_key (&setop->output_rfn));
988 1511
989 rfn = GNUNET_new (struct ReferendumEntry); 1512 rfn = rfn_create (session->num_peers);
990 rfn->key = task->output_rfn; 1513 rfn->key = setop->output_rfn;
991 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
992 rfn->peer_commited = GNUNET_new_array (session->num_peers, int);
993 put_rfn (session, rfn); 1514 put_rfn (session, rfn);
994 } 1515 }
995 } 1516 }
996 1517
1518 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1519 {
1520 if (NULL == lookup_diff (session, &setop->output_diff))
1521 {
1522 struct DiffEntry *diff;
1523
1524 diff = diff_create ();
1525 diff->key = setop->output_diff;
1526 put_diff (session, diff);
1527 }
1528 }
1529
997 if (task->key.peer1 == session->local_peer_idx) 1530 if (task->key.peer1 == session->local_peer_idx)
998 { 1531 {
999 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 }; 1532 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
@@ -1001,7 +1534,7 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
1001 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1534 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002 "P%u: Looking up set {%s} to run remote union\n", 1535 "P%u: Looking up set {%s} to run remote union\n",
1003 session->local_peer_idx, 1536 session->local_peer_idx,
1004 debug_str_set_key (&task->input_set)); 1537 debug_str_set_key (&setop->input_set));
1005 1538
1006 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); 1539 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
1007 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage)); 1540 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
@@ -1012,23 +1545,20 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
1012 rcm.leader = htons (task->key.leader); 1545 rcm.leader = htons (task->key.leader);
1013 rcm.repetition = htons (task->key.repetition); 1546 rcm.repetition = htons (task->key.repetition);
1014 1547
1015 GNUNET_assert (NULL == task->op); 1548 GNUNET_assert (NULL == setop->op);
1016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n", 1549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
1017 session->local_peer_idx, task->key.peer2, debug_str_set_key (&task->input_set)); 1550 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
1018 1551
1019 // XXX: maybe this should be done while 1552 // XXX: maybe this should be done while
1020 // setting up tasks alreays? 1553 // setting up tasks alreays?
1021 task->op = GNUNET_SET_prepare (&session->peers[task->key.peer2], 1554 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
1022 &session->global_id, 1555 &session->global_id,
1023 &rcm.header, 1556 &rcm.header,
1024 GNUNET_SET_RESULT_ADDED, /* XXX: will be obsolete soon */ 1557 GNUNET_SET_RESULT_SYMMETRIC,
1025 set_result_cb, 1558 set_result_cb,
1026 task); 1559 task);
1027 1560
1028 /* Referendums must be materialized as a set before */ 1561 if (GNUNET_OK != GNUNET_SET_commit (setop->op, input->h))
1029 GNUNET_assert (RFN_KIND_NONE == task->input_rfn.rfn_kind);
1030
1031 if (GNUNET_OK != GNUNET_SET_commit (task->op, input->h))
1032 { 1562 {
1033 GNUNET_break (0); 1563 GNUNET_break (0);
1034 /* XXX: cleanup? */ 1564 /* XXX: cleanup? */
@@ -1041,9 +1571,8 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
1041 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n", 1571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
1042 session->local_peer_idx, task->key.peer1); 1572 session->local_peer_idx, task->key.peer1);
1043 1573
1044 if (NULL != task->op) 1574 if (NULL != setop->op)
1045 { 1575 {
1046 GNUNET_assert (NULL == task->commited_set);
1047 commit_set (session, task); 1576 commit_set (session, task);
1048 } 1577 }
1049 } 1578 }
@@ -1159,11 +1688,11 @@ eval_rfn_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1159 1688
1160 set = GNUNET_new (struct SetEntry); 1689 set = GNUNET_new (struct SetEntry);
1161 set->h = copy; 1690 set->h = copy;
1162 set->key = task->output_set; 1691 set->key = task->cls.eval_rfn.output_set;
1163 1692
1164 put_set (session, set); 1693 put_set (session, set);
1165 1694
1166 run_task_eval_rfn (session, task); 1695 task_start_eval_rfn (task);
1167} 1696}
1168 1697
1169 1698
@@ -1173,7 +1702,7 @@ eval_rfn_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1173 * set and store the result in the output set and/or output diff. 1702 * set and store the result in the output set and/or output diff.
1174 */ 1703 */
1175static void 1704static void
1176run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task) 1705task_start_eval_rfn (struct TaskEntry *task)
1177{ 1706{
1178 struct GNUNET_CONTAINER_MultiHashMapIterator *iter; 1707 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1179 struct ReferendumEntry *input_rfn; 1708 struct ReferendumEntry *input_rfn;
@@ -1181,24 +1710,23 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
1181 struct SetEntry *output_set = NULL; 1710 struct SetEntry *output_set = NULL;
1182 struct DiffEntry *output_diff = NULL; 1711 struct DiffEntry *output_diff = NULL;
1183 struct SetChangeProgressCls *progress_cls; 1712 struct SetChangeProgressCls *progress_cls;
1713 struct EvalRfnCls *rcls = &task->cls.eval_rfn;
1714 struct ConsensusSession *session = task->step->session;
1184 1715
1185 /* Have at least one output */ 1716 /* Have at least one output */
1186 GNUNET_assert ( (task->output_set.set_kind != SET_KIND_NONE) || 1717 GNUNET_assert ( (rcls->output_set.set_kind != SET_KIND_NONE) ||
1187 (task->output_diff.diff_kind != DIFF_KIND_NONE)); 1718 (rcls->output_diff.diff_kind != DIFF_KIND_NONE));
1188
1189 /* Not allowed as output */
1190 GNUNET_assert ( (task->output_rfn.rfn_kind == RFN_KIND_NONE));
1191 1719
1192 if (SET_KIND_NONE != task->output_set.set_kind) 1720 if (SET_KIND_NONE != rcls->output_set.set_kind)
1193 { 1721 {
1194 /* We have a set output, thus the output set must 1722 /* We have a set output, thus the output set must
1195 exist or copy it from the input set */ 1723 exist or copy it from the input set */
1196 output_set = lookup_set (session, &task->output_set); 1724 output_set = lookup_set (session, &rcls->output_set);
1197 if (NULL == output_set) 1725 if (NULL == output_set)
1198 { 1726 {
1199 struct SetEntry *input_set; 1727 struct SetEntry *input_set;
1200 1728
1201 input_set = lookup_set (session, &task->input_set); 1729 input_set = lookup_set (session, &rcls->input_set);
1202 GNUNET_assert (NULL != input_set); 1730 GNUNET_assert (NULL != input_set);
1203 GNUNET_SET_copy_lazy (input_set->h, 1731 GNUNET_SET_copy_lazy (input_set->h,
1204 eval_rfn_copy_cb, 1732 eval_rfn_copy_cb,
@@ -1209,21 +1737,26 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
1209 } 1737 }
1210 } 1738 }
1211 1739
1212 if (DIFF_KIND_NONE != task->output_diff.diff_kind) 1740 if (DIFF_KIND_NONE != rcls->output_diff.diff_kind)
1213 { 1741 {
1214 output_diff = lookup_diff (session, &task->output_diff); 1742 output_diff = lookup_diff (session, &rcls->output_diff);
1215 if (NULL == output_diff) 1743 if (NULL == output_diff)
1216 { 1744 {
1217 output_diff = GNUNET_new (struct DiffEntry); 1745 output_diff = diff_create ();
1218 output_diff->key = task->output_diff; 1746 output_diff->key = rcls->output_diff;
1219 output_diff->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1220 put_diff (session, output_diff); 1747 put_diff (session, output_diff);
1221 } 1748 }
1222 } 1749 }
1223 1750
1751 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1752 "Evaluating referendum in Task {%s}\n",
1753 debug_str_task_key (&task->key));
1754
1755
1224 progress_cls = GNUNET_new (struct SetChangeProgressCls); 1756 progress_cls = GNUNET_new (struct SetChangeProgressCls);
1757 progress_cls->task = task;
1225 1758
1226 input_rfn = lookup_rfn (session, &task->input_rfn); 1759 input_rfn = lookup_rfn (session, &rcls->input_rfn);
1227 1760
1228 GNUNET_assert (NULL != input_rfn); 1761 GNUNET_assert (NULL != input_rfn);
1229 1762
@@ -1232,18 +1765,28 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
1232 1765
1233 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri)) 1766 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1234 { 1767 {
1235 int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, task->threshold); 1768 int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, rcls->threshold);
1236 switch (majority_vote) 1769 switch (majority_vote)
1237 { 1770 {
1238 case VOTE_ADD: 1771 case VOTE_ADD:
1772#ifdef GNUNET_EXTRA_LOGGING
1773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1774 "P%u: referendum vote result: VOTE_ADD for element %s in task {%s} with"
1775 "output set {%s} and output diff {%s}\n",
1776 session->local_peer_idx,
1777 debug_str_element (ri->element),
1778 debug_str_task_key (&task->key),
1779 debug_str_set_key (&rcls->output_set),
1780 debug_str_diff_key (&rcls->output_diff));
1781#endif
1239 if (NULL != output_set) 1782 if (NULL != output_set)
1240 { 1783 {
1241 progress_cls->num_pending++; 1784 progress_cls->num_pending++;
1242 GNUNET_assert (GNUNET_OK == 1785 GNUNET_assert (GNUNET_OK ==
1243 GNUNET_SET_add_element (output_set->h, 1786 GNUNET_SET_add_element (output_set->h,
1244 ri->element, 1787 ri->element,
1245 eval_rfn_progress, 1788 eval_rfn_progress,
1246 progress_cls)); 1789 progress_cls));
1247 } 1790 }
1248 if (NULL != output_diff) 1791 if (NULL != output_diff)
1249 { 1792 {
@@ -1253,16 +1796,37 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
1253 case VOTE_CONTESTED: 1796 case VOTE_CONTESTED:
1254 if (NULL != output_set) 1797 if (NULL != output_set)
1255 output_set->is_contested = GNUNET_YES; 1798 output_set->is_contested = GNUNET_YES;
1799#ifdef GNUNET_EXTRA_LOGGING
1800 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1801 "P%u: referendum vote result: VOTE_CONTESTED for element %s in task {%s} with"
1802 "output set {%s} and output diff {%s}\n",
1803 session->local_peer_idx,
1804 debug_str_element (ri->element),
1805 debug_str_task_key (&task->key),
1806 debug_str_set_key (&rcls->output_set),
1807 debug_str_diff_key (&rcls->output_diff));
1808#endif
1256 /* fallthrough */ 1809 /* fallthrough */
1257 case VOTE_REMOVE: 1810 case VOTE_REMOVE:
1811#ifdef GNUNET_EXTRA_LOGGING
1812 if (VOTE_REMOVE == majority_vote)
1813 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1814 "P%u: referendum vote result: VOTE_REMOVE for element %s in task {%s} with"
1815 "output set {%s} and output diff {%s}\n",
1816 session->local_peer_idx,
1817 debug_str_element (ri->element),
1818 debug_str_task_key (&task->key),
1819 debug_str_set_key (&rcls->output_set),
1820 debug_str_diff_key (&rcls->output_diff));
1821#endif
1258 if (NULL != output_set) 1822 if (NULL != output_set)
1259 { 1823 {
1260 progress_cls->num_pending++; 1824 progress_cls->num_pending++;
1261 GNUNET_assert (GNUNET_OK == 1825 GNUNET_assert (GNUNET_OK ==
1262 GNUNET_SET_remove_element (output_set->h, 1826 GNUNET_SET_remove_element (output_set->h,
1263 ri->element, 1827 ri->element,
1264 eval_rfn_progress, 1828 eval_rfn_progress,
1265 progress_cls)); 1829 progress_cls));
1266 } 1830 }
1267 if (NULL != output_diff) 1831 if (NULL != output_diff)
1268 { 1832 {
@@ -1270,6 +1834,8 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
1270 } 1834 }
1271 break; 1835 break;
1272 case VOTE_NONE: 1836 case VOTE_NONE:
1837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1838 "referendum vote result: VOTE_NONE\n");
1273 /* Nothing to do. */ 1839 /* Nothing to do. */
1274 break; 1840 break;
1275 default: 1841 default:
@@ -1294,14 +1860,15 @@ apply_diff_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1294 struct TaskEntry *task = (struct TaskEntry *) cls; 1860 struct TaskEntry *task = (struct TaskEntry *) cls;
1295 struct ConsensusSession *session = task->step->session; 1861 struct ConsensusSession *session = task->step->session;
1296 struct SetEntry *set; 1862 struct SetEntry *set;
1863 struct ApplyDiffCls *diffop = &task->cls.apply_diff;
1297 1864
1298 set = GNUNET_new (struct SetEntry); 1865 set = GNUNET_new (struct SetEntry);
1299 set->h = copy; 1866 set->h = copy;
1300 set->key = task->output_set; 1867 set->key = diffop->output_set;
1301 1868
1302 put_set (session, set); 1869 put_set (session, set);
1303 1870
1304 run_task_apply_diff (session, task); 1871 task_start_apply_diff (task);
1305} 1872}
1306 1873
1307 1874
@@ -1334,28 +1901,30 @@ apply_diff_progress (void *cls)
1334 1901
1335 1902
1336static void 1903static void
1337run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task) 1904task_start_apply_diff (struct TaskEntry *task)
1338{ 1905{
1339 struct SetEntry *output_set; 1906 struct SetEntry *output_set;
1340 struct DiffEntry *input_diff; 1907 struct DiffEntry *input_diff;
1341 struct GNUNET_CONTAINER_MultiHashMapIterator *iter; 1908 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1342 struct DiffElementInfo *di; 1909 struct DiffElementInfo *di;
1343 struct SetChangeProgressCls *progress_cls; 1910 struct SetChangeProgressCls *progress_cls;
1911 struct ApplyDiffCls *diffop = &task->cls.apply_diff;
1912 struct ConsensusSession *session = task->step->session;
1344 1913
1345 GNUNET_assert (task->output_set.set_kind != SET_KIND_NONE); 1914 GNUNET_assert (diffop->output_set.set_kind != SET_KIND_NONE);
1346 GNUNET_assert (task->input_diff.diff_kind != DIFF_KIND_NONE); 1915 GNUNET_assert (diffop->input_diff.diff_kind != DIFF_KIND_NONE);
1347 1916
1348 input_diff = lookup_diff (session, &task->input_diff); 1917 input_diff = lookup_diff (session, &diffop->input_diff);
1349 1918
1350 GNUNET_assert (NULL != input_diff); 1919 GNUNET_assert (NULL != input_diff);
1351 1920
1352 output_set = lookup_set (session, &task->output_set); 1921 output_set = lookup_set (session, &diffop->output_set);
1353 1922
1354 if (NULL == output_set) 1923 if (NULL == output_set)
1355 { 1924 {
1356 struct SetEntry *input_set; 1925 struct SetEntry *input_set;
1357 1926
1358 input_set = lookup_set (session, &task->input_set); 1927 input_set = lookup_set (session, &diffop->input_set);
1359 GNUNET_assert (NULL != input_set); 1928 GNUNET_assert (NULL != input_set);
1360 GNUNET_SET_copy_lazy (input_set->h, 1929 GNUNET_SET_copy_lazy (input_set->h,
1361 apply_diff_copy_cb, 1930 apply_diff_copy_cb,
@@ -1403,11 +1972,12 @@ run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task)
1403 1972
1404 1973
1405static void 1974static void
1406run_task_finish (struct ConsensusSession *session, struct TaskEntry *task) 1975task_start_finish (struct TaskEntry *task)
1407{ 1976{
1408 struct SetEntry *final_set; 1977 struct SetEntry *final_set;
1978 struct ConsensusSession *session = task->step->session;
1409 1979
1410 final_set = lookup_set (session, &task->input_set); 1980 final_set = lookup_set (session, &task->cls.finish.input_set);
1411 1981
1412 GNUNET_assert (NULL != final_set); 1982 GNUNET_assert (NULL != final_set);
1413 1983
@@ -1418,37 +1988,17 @@ run_task_finish (struct ConsensusSession *session, struct TaskEntry *task)
1418} 1988}
1419 1989
1420static void 1990static void
1421run_task (struct ConsensusSession *session, struct TaskEntry *task) 1991start_task (struct ConsensusSession *session, struct TaskEntry *task)
1422{ 1992{
1423 GNUNET_assert (GNUNET_NO == task->is_running); 1993 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
1994
1995 GNUNET_assert (GNUNET_NO == task->is_started);
1424 GNUNET_assert (GNUNET_NO == task->is_finished); 1996 GNUNET_assert (GNUNET_NO == task->is_finished);
1997 GNUNET_assert (NULL != task->start);
1425 1998
1426 1999 task->start (task);
1427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
1428 2000
1429 switch (task->action) 2001 task->is_started = GNUNET_YES;
1430 {
1431 case ACTION_RECONCILE:
1432 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_RECONCILE task\n", session->local_peer_idx);
1433 run_task_remote_union (session, task);
1434 break;
1435 case ACTION_EVAL_RFN:
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_EVAL_RFN task\n", session->local_peer_idx);
1437 run_task_eval_rfn (session, task);
1438 break;
1439 case ACTION_APPLY_DIFF:
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_APPLY_DIFF task\n", session->local_peer_idx);
1441 run_task_apply_diff (session, task);
1442 break;
1443 case ACTION_FINISH:
1444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_FINISH task\n", session->local_peer_idx);
1445 run_task_finish (session, task);
1446 break;
1447 default:
1448 /* not reached */
1449 GNUNET_assert (0);
1450 }
1451 task->is_running = GNUNET_YES;
1452} 2002}
1453 2003
1454 2004
@@ -1515,7 +2065,7 @@ run_ready_steps (struct ConsensusSession *session)
1515 2065
1516 step->is_running = GNUNET_YES; 2066 step->is_running = GNUNET_YES;
1517 for (i = 0; i < step->tasks_len; i++) 2067 for (i = 0; i < step->tasks_len; i++)
1518 run_task (session, step->tasks[i]); 2068 start_task (session, step->tasks[i]);
1519 2069
1520 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */ 2070 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
1521 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished)) 2071 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
@@ -1730,12 +2280,6 @@ set_listen_cb (void *cls,
1730 return; 2280 return;
1731 } 2281 }
1732 2282
1733 if (ACTION_RECONCILE != task->action)
1734 {
1735 GNUNET_break_op (0);
1736 return;
1737 }
1738
1739 if (GNUNET_YES == task->is_finished) 2283 if (GNUNET_YES == task->is_finished)
1740 { 2284 {
1741 GNUNET_break_op (0); 2285 GNUNET_break_op (0);
@@ -1754,15 +2298,15 @@ set_listen_cb (void *cls,
1754 else 2298 else
1755 my_result_cb = set_result_cb; 2299 my_result_cb = set_result_cb;
1756 2300
1757 task->op = GNUNET_SET_accept (request, 2301 task->cls.setop.op = GNUNET_SET_accept (request,
1758 GNUNET_SET_RESULT_ADDED, /* XXX: obsolete soon */ 2302 GNUNET_SET_RESULT_SYMMETRIC,
1759 my_result_cb, 2303 my_result_cb,
1760 task); 2304 task);
1761 2305
1762 /* If the task hasn't been started yet, 2306 /* If the task hasn't been started yet,
1763 we wait for that until we commit. */ 2307 we wait for that until we commit. */
1764 2308
1765 if (GNUNET_YES == task->is_running) 2309 if (GNUNET_YES == task->is_started)
1766 { 2310 {
1767 commit_set (session, task); 2311 commit_set (session, task);
1768 } 2312 }
@@ -1969,11 +2513,11 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
1969 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead); 2513 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead);
1970 task = ((struct TaskEntry) { 2514 task = ((struct TaskEntry) {
1971 .step = step, 2515 .step = step,
1972 .action = ACTION_RECONCILE, 2516 .start = task_start_reconcile,
2517 .cancel = task_cancel_reconcile,
1973 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me }, 2518 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
1974 .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
1975 .output_set = (struct SetKey) { SET_KIND_NONE },
1976 }); 2519 });
2520 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
1977 put_task (session->taskmap, &task); 2521 put_task (session->taskmap, &task);
1978 } 2522 }
1979 /* We run this task to make sure that the leader 2523 /* We run this task to make sure that the leader
@@ -1982,12 +2526,13 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
1982 without the code having to handle any special cases. */ 2526 without the code having to handle any special cases. */
1983 task = ((struct TaskEntry) { 2527 task = ((struct TaskEntry) {
1984 .step = step, 2528 .step = step,
1985 .action = ACTION_RECONCILE,
1986 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me }, 2529 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
1987 .input_set = (struct SetKey) { SET_KIND_CURRENT, rep }, 2530 .start = task_start_reconcile,
1988 .output_set = (struct SetKey) { SET_KIND_LEADER, rep, me }, 2531 .cancel = task_cancel_reconcile,
1989 .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, me },
1990 }); 2532 });
2533 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2534 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2535 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
1991 put_task (session->taskmap, &task); 2536 put_task (session->taskmap, &task);
1992 } 2537 }
1993 else 2538 else
@@ -1998,12 +2543,13 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
1998 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead); 2543 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead);
1999 task = ((struct TaskEntry) { 2544 task = ((struct TaskEntry) {
2000 .step = step, 2545 .step = step,
2001 .action = ACTION_RECONCILE,
2002 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead}, 2546 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
2003 .input_set = (struct SetKey) { SET_KIND_CURRENT, rep }, 2547 .start = task_start_reconcile,
2004 .output_set = (struct SetKey) { SET_KIND_LEADER, rep, lead }, 2548 .cancel = task_cancel_reconcile,
2005 .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, lead },
2006 }); 2549 });
2550 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2551 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2552 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2007 put_task (session->taskmap, &task); 2553 put_task (session->taskmap, &task);
2008 } 2554 }
2009 2555
@@ -2022,11 +2568,12 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2022 arrange_peers (&p1, &p2, n); 2568 arrange_peers (&p1, &p2, n);
2023 task = ((struct TaskEntry) { 2569 task = ((struct TaskEntry) {
2024 .step = step, 2570 .step = step,
2025 .action = ACTION_RECONCILE,
2026 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead }, 2571 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2027 .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead }, 2572 .start = task_start_reconcile,
2028 .output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead }, 2573 .cancel = task_cancel_reconcile,
2029 }); 2574 });
2575 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2576 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2030 put_task (session->taskmap, &task); 2577 put_task (session->taskmap, &task);
2031 } 2578 }
2032 2579
@@ -2041,12 +2588,12 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2041 task = ((struct TaskEntry) { 2588 task = ((struct TaskEntry) {
2042 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead }, 2589 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2043 .step = step, 2590 .step = step,
2044 .action = ACTION_EVAL_RFN, 2591 .start = task_start_eval_rfn
2045 .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
2046 .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
2047 .output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
2048 .threshold = n - t,
2049 }); 2592 });
2593 task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead },
2594 task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
2595 task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
2596 task.cls.eval_rfn.threshold = n - t,
2050 put_task (session->taskmap, &task); 2597 put_task (session->taskmap, &task);
2051 2598
2052 prev_step = step; 2599 prev_step = step;
@@ -2064,11 +2611,12 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2064 arrange_peers (&p1, &p2, n); 2611 arrange_peers (&p1, &p2, n);
2065 task = ((struct TaskEntry) { 2612 task = ((struct TaskEntry) {
2066 .step = step, 2613 .step = step,
2067 .action = ACTION_RECONCILE, 2614 .start = task_start_reconcile,
2615 .cancel = task_cancel_reconcile,
2068 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead}, 2616 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2069 .input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
2070 .output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead },
2071 }); 2617 });
2618 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2619 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2072 put_task (session->taskmap, &task); 2620 put_task (session->taskmap, &task);
2073 } 2621 }
2074 2622
@@ -2081,13 +2629,32 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
2081 2629
2082 // evaluate ConfirmationReferendum and 2630 // evaluate ConfirmationReferendum and
2083 // apply it to the LeaderReferendum 2631 // apply it to the LeaderReferendum
2632 // XXX: the diff should contain grading information
2084 task = ((struct TaskEntry) { 2633 task = ((struct TaskEntry) {
2085 .step = step, 2634 .step = step,
2086 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead }, 2635 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2087 .action = ACTION_EVAL_RFN, 2636 .start = task_start_eval_rfn,
2088 .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
2089 .output_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, rep },
2090 }); 2637 });
2638 task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2639 task.cls.eval_rfn.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_CONSENSUS, rep, lead };
2640 put_task (session->taskmap, &task);
2641
2642
2643 prev_step = step;
2644 step = create_step (session, round, 1);
2645#ifdef GNUNET_EXTRA_LOGGING
2646 GNUNET_asprintf (&step->debug_name, "gc apply, lead %u rep %u", lead, rep);
2647#endif
2648 step_depend_on (step, prev_step);
2649
2650 task = ((struct TaskEntry) {
2651 .step = step,
2652 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, rep, lead },
2653 .start = task_start_leader_apply,
2654 });
2655 task.cls.leader_apply.input_diff_1 = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2656 task.cls.leader_apply.input_diff_2 = (struct DiffKey) { DIFF_KIND_LEADER_CONSENSUS, rep, lead };
2657 task.cls.leader_apply.output_rfn = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, rep };
2091 put_task (session->taskmap, &task); 2658 put_task (session->taskmap, &task);
2092 2659
2093 step_depend_on (step_after, step); 2660 step_depend_on (step_after, step);
@@ -2142,10 +2709,12 @@ construct_task_graph (struct ConsensusSession *session)
2142 task = ((struct TaskEntry) { 2709 task = ((struct TaskEntry) {
2143 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 }, 2710 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2144 .step = step, 2711 .step = step,
2145 .action = ACTION_RECONCILE, 2712 .start = task_start_reconcile,
2146 .input_set = (struct SetKey) { SET_KIND_CURRENT, 0 }, 2713 .cancel = task_cancel_reconcile,
2147 .output_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
2148 }); 2714 });
2715 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2716 task.cls.setop.output_set = task.cls.setop.input_set;
2717 task.cls.setop.do_not_remove = GNUNET_YES;
2149 put_task (session->taskmap, &task); 2718 put_task (session->taskmap, &task);
2150 } 2719 }
2151 2720
@@ -2164,32 +2733,30 @@ construct_task_graph (struct ConsensusSession *session)
2164 2733
2165 step_rep_start = create_step (session, round, 1); 2734 step_rep_start = create_step (session, round, 1);
2166#ifdef GNUNET_EXTRA_LOGGING 2735#ifdef GNUNET_EXTRA_LOGGING
2167 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i); 2736 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2168#endif 2737#endif
2169 2738
2170 step_depend_on (step_rep_start, prev_step); 2739 step_depend_on (step_rep_start, prev_step);
2171 2740
2172 step_rep_end = create_step (session, round, 1); 2741 step_rep_end = create_step (session, round, 1);
2173#ifdef GNUNET_EXTRA_LOGGING 2742#ifdef GNUNET_EXTRA_LOGGING
2174 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i); 2743 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2175#endif 2744#endif
2176 2745
2177 /* parallel gradecasts */ 2746 /* parallel gradecasts */
2178 for (lead = 0; lead < n; lead++) 2747 for (lead = 0; lead < n; lead++)
2179 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end); 2748 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2180 2749
2181 // TODO: add peers to ignore list, 2750 // TODO: add peers to ignore list, either here or
2182 // 2751 // already in the gradecast.
2183 // evaluate ConfirmationReferendum and
2184 // apply it to the LeaderReferendum
2185 task = ((struct TaskEntry) { 2752 task = ((struct TaskEntry) {
2186 .step = step_rep_end, 2753 .step = step_rep_end,
2187 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, i, -1}, 2754 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2188 .action = ACTION_APPLY_DIFF, 2755 .start = task_start_eval_rfn,
2189 .input_set = (struct SetKey) { SET_KIND_CURRENT, i },
2190 .input_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, i },
2191 .output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 },
2192 }); 2756 });
2757 task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_CURRENT, i };
2758 task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, i };
2759 task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 };
2193 put_task (session->taskmap, &task); 2760 put_task (session->taskmap, &task);
2194 2761
2195 prev_step = step_rep_end; 2762 prev_step = step_rep_end;
@@ -2206,9 +2773,9 @@ construct_task_graph (struct ConsensusSession *session)
2206 task = ((struct TaskEntry) { 2773 task = ((struct TaskEntry) {
2207 .step = step, 2774 .step = step,
2208 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 }, 2775 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2209 .input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 }, 2776 .start = task_start_finish,
2210 .action = ACTION_FINISH,
2211 }); 2777 });
2778 task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
2212 2779
2213 put_task (session->taskmap, &task); 2780 put_task (session->taskmap, &task);
2214} 2781}
@@ -2399,10 +2966,21 @@ client_insert (void *cls,
2399 } 2966 }
2400 session->num_client_insert_pending++; 2967 session->num_client_insert_pending++;
2401 GNUNET_SET_add_element (initial_set, element, client_insert_done, session); 2968 GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
2969
2970#ifdef GNUNET_EXTRA_LOGGING
2971 {
2972 struct GNUNET_HashCode hash;
2973
2974 GNUNET_SET_element_hash (element, &hash);
2975
2976 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
2977 session->local_peer_idx,
2978 GNUNET_h2s (&hash));
2979 }
2980#endif
2981
2402 GNUNET_free (element); 2982 GNUNET_free (element);
2403 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2983 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2404
2405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
2406} 2984}
2407 2985
2408 2986