aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c73
1 files changed, 43 insertions, 30 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 4036d2b11..9d5d35c94 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -26,6 +26,7 @@
26 26
27#include "platform.h" 27#include "platform.h"
28#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
29#include "gnunet_block_lib.h"
29#include "gnunet_protocols.h" 30#include "gnunet_protocols.h"
30#include "gnunet_applications.h" 31#include "gnunet_applications.h"
31#include "gnunet_set_service.h" 32#include "gnunet_set_service.h"
@@ -34,8 +35,6 @@
34#include "consensus_protocol.h" 35#include "consensus_protocol.h"
35#include "consensus.h" 36#include "consensus.h"
36 37
37#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
38
39 38
40enum ReferendumVote 39enum ReferendumVote
41{ 40{
@@ -65,11 +64,6 @@ enum EarlyStoppingPhase
65 64
66GNUNET_NETWORK_STRUCT_BEGIN 65GNUNET_NETWORK_STRUCT_BEGIN
67 66
68
69struct ContestedPayload
70{
71};
72
73/** 67/**
74 * Tuple of integers that together 68 * Tuple of integers that together
75 * identify a task uniquely. 69 * identify a task uniquely.
@@ -669,16 +663,22 @@ send_to_client_iter (void *cls,
669 if (NULL != element) 663 if (NULL != element)
670 { 664 {
671 struct GNUNET_CONSENSUS_ElementMessage *m; 665 struct GNUNET_CONSENSUS_ElementMessage *m;
666 const struct ConsensusElement *ce;
667
668 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
669 ce = element->data;
670
671 GNUNET_assert (GNUNET_NO == ce->is_contested_marker);
672 672
673 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 673 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674 "P%d: sending element %s to client\n", 674 "P%d: sending element %s to client\n",
675 session->local_peer_idx, 675 session->local_peer_idx,
676 debug_str_element (element)); 676 debug_str_element (element));
677 677
678 ev = GNUNET_MQ_msg_extra (m, element->size, 678 ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
679 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); 679 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
680 m->element_type = htons (element->element_type); 680 m->element_type = ce->payload_type;
681 GNUNET_memcpy (&m[1], element->data, element->size); 681 GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
682 GNUNET_MQ_send (session->client_mq, ev); 682 GNUNET_MQ_send (session->client_mq, ev);
683 } 683 }
684 else 684 else
@@ -878,6 +878,13 @@ set_result_cb (void *cls,
878 struct ReferendumEntry *output_rfn = NULL; 878 struct ReferendumEntry *output_rfn = NULL;
879 unsigned int other_idx; 879 unsigned int other_idx;
880 struct SetOpCls *setop; 880 struct SetOpCls *setop;
881 const struct ConsensusElement *consensus_element = NULL;
882
883 if (NULL != element)
884 {
885 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
886 consensus_element = element->data;
887 }
881 888
882 setop = &task->cls.setop; 889 setop = &task->cls.setop;
883 890
@@ -932,7 +939,8 @@ set_result_cb (void *cls,
932 939
933 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) ) 940 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
934 { 941 {
935 if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) ) 942 if ( (GNUNET_YES == setop->transceive_contested) &&
943 (GNUNET_YES == consensus_element->is_contested_marker) )
936 { 944 {
937 GNUNET_assert (NULL != output_rfn); 945 GNUNET_assert (NULL != output_rfn);
938 rfn_contest (output_rfn, task_other_peer (task)); 946 rfn_contest (output_rfn, task_other_peer (task));
@@ -943,6 +951,7 @@ set_result_cb (void *cls,
943 switch (status) 951 switch (status)
944 { 952 {
945 case GNUNET_SET_STATUS_ADD_LOCAL: 953 case GNUNET_SET_STATUS_ADD_LOCAL:
954 GNUNET_assert (NULL != consensus_element);
946 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 955 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947 "Adding element in Task {%s}\n", 956 "Adding element in Task {%s}\n",
948 debug_str_task_key (&task->key)); 957 debug_str_task_key (&task->key));
@@ -989,9 +998,10 @@ set_result_cb (void *cls,
989 // XXX: add result to structures in task 998 // XXX: add result to structures in task
990 break; 999 break;
991 case GNUNET_SET_STATUS_ADD_REMOTE: 1000 case GNUNET_SET_STATUS_ADD_REMOTE:
1001 GNUNET_assert (NULL != consensus_element);
992 if (GNUNET_YES == setop->do_not_remove) 1002 if (GNUNET_YES == setop->do_not_remove)
993 break; 1003 break;
994 if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) 1004 if (GNUNET_YES == consensus_element->is_contested_marker)
995 break; 1005 break;
996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997 "Removing element in Task {%s}\n", 1007 "Removing element in Task {%s}\n",
@@ -1318,10 +1328,11 @@ commit_set (struct ConsensusSession *session,
1318 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) ) 1328 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1319 { 1329 {
1320 struct GNUNET_SET_Element element; 1330 struct GNUNET_SET_Element element;
1321 struct ContestedPayload payload; 1331 struct ConsensusElement ce = { 0 };
1322 element.data = &payload; 1332 ce.is_contested_marker = GNUNET_YES;
1323 element.size = sizeof (struct ContestedPayload); 1333 element.data = &ce;
1324 element.element_type = ELEMENT_TYPE_CONTESTED_MARKER; 1334 element.size = sizeof (struct ConsensusElement);
1335 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1325 GNUNET_SET_add_element (set->h, &element, NULL, NULL); 1336 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1326 } 1337 }
1327 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)]) 1338 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
@@ -3041,9 +3052,9 @@ handle_client_insert (void *cls,
3041 const struct GNUNET_CONSENSUS_ElementMessage *msg) 3052 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3042{ 3053{
3043 struct ConsensusSession *session = cls; 3054 struct ConsensusSession *session = cls;
3044 struct GNUNET_SET_Element *element;
3045 ssize_t element_size; 3055 ssize_t element_size;
3046 struct GNUNET_SET_Handle *initial_set; 3056 struct GNUNET_SET_Handle *initial_set;
3057 struct ConsensusElement *ce;
3047 3058
3048 if (GNUNET_YES == session->conclude_started) 3059 if (GNUNET_YES == session->conclude_started)
3049 { 3060 {
@@ -3051,12 +3062,18 @@ handle_client_insert (void *cls,
3051 GNUNET_SERVICE_client_drop (session->client); 3062 GNUNET_SERVICE_client_drop (session->client);
3052 return; 3063 return;
3053 } 3064 }
3065
3054 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); 3066 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3055 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); 3067 ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3056 element->element_type = msg->element_type; 3068 GNUNET_memcpy (&ce[1], &msg[1], element_size);
3057 element->size = element_size; 3069 ce->payload_type = msg->element_type;
3058 GNUNET_memcpy (&element[1], &msg[1], element_size); 3070
3059 element->data = &element[1]; 3071 struct GNUNET_SET_Element element = {
3072 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3073 .size = sizeof (struct ConsensusElement) + element_size,
3074 .data = ce,
3075 };
3076
3060 { 3077 {
3061 struct SetKey key = { SET_KIND_CURRENT, 0, 0 }; 3078 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3062 struct SetEntry *entry; 3079 struct SetEntry *entry;
@@ -3066,26 +3083,22 @@ handle_client_insert (void *cls,
3066 GNUNET_assert (NULL != entry); 3083 GNUNET_assert (NULL != entry);
3067 initial_set = entry->h; 3084 initial_set = entry->h;
3068 } 3085 }
3086
3069 session->num_client_insert_pending++; 3087 session->num_client_insert_pending++;
3070 GNUNET_SET_add_element (initial_set, 3088 GNUNET_SET_add_element (initial_set,
3071 element, 3089 &element,
3072 &client_insert_done, 3090 &client_insert_done,
3073 session); 3091 session);
3074 3092
3075#ifdef GNUNET_EXTRA_LOGGING 3093#ifdef GNUNET_EXTRA_LOGGING
3076 { 3094 {
3077 struct GNUNET_HashCode hash;
3078
3079 GNUNET_SET_element_hash (element,
3080 &hash);
3081
3082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3095 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3083 "P%u: element %s added\n", 3096 "P%u: element %s added\n",
3084 session->local_peer_idx, 3097 session->local_peer_idx,
3085 GNUNET_h2s (&hash)); 3098 debug_str_element (&element));
3086 } 3099 }
3087#endif 3100#endif
3088 GNUNET_free (element); 3101 GNUNET_free (ce);
3089 GNUNET_SERVICE_client_continue (session->client); 3102 GNUNET_SERVICE_client_continue (session->client);
3090} 3103}
3091 3104