aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-01-17 00:53:11 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-01-17 00:53:11 +0000
commit21273cba1880b1081b4152ee45b2f4ad6768e639 (patch)
tree325c93f75c67456b6729e581b9b69cc0bc52df48 /src/consensus
parentbdee53dd2cb760e9acd601e251ba59c42c98c02f (diff)
downloadgnunet-21273cba1880b1081b4152ee45b2f4ad6768e639.tar.gz
gnunet-21273cba1880b1081b4152ee45b2f4ad6768e639.zip
- gnunet-consensus now profiling tool
- work on service implementation, not working yet
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/Makefile.am5
-rw-r--r--src/consensus/consensus.h4
-rw-r--r--src/consensus/consensus_api.c50
-rw-r--r--src/consensus/consensus_protocol.h71
-rw-r--r--src/consensus/gnunet-consensus-start-peers.c3
-rw-r--r--src/consensus/gnunet-consensus.c360
-rw-r--r--src/consensus/gnunet-service-consensus.c1264
-rw-r--r--src/consensus/ibf.c87
-rw-r--r--src/consensus/ibf.h51
-rw-r--r--src/consensus/test_consensus.conf2
-rw-r--r--src/consensus/test_consensus_api.c53
11 files changed, 1383 insertions, 567 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index 1beaa0c62..f5a5c5cdc 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -31,6 +31,7 @@ gnunet_consensus_SOURCES = \
31gnunet_consensus_LDADD = \ 31gnunet_consensus_LDADD = \
32 $(top_builddir)/src/util/libgnunetutil.la \ 32 $(top_builddir)/src/util/libgnunetutil.la \
33 $(top_builddir)/src/consensus/libgnunetconsensus.la \ 33 $(top_builddir)/src/consensus/libgnunetconsensus.la \
34 $(top_builddir)/src/testbed/libgnunettestbed.la \
34 $(GN_LIBINTL) 35 $(GN_LIBINTL)
35gnunet_consensus_DEPENDENCIES = \ 36gnunet_consensus_DEPENDENCIES = \
36 libgnunetconsensus.la 37 libgnunetconsensus.la
@@ -53,10 +54,12 @@ gnunet_consensus_ibf_LDADD = \
53 $(GN_LIBINTL) 54 $(GN_LIBINTL)
54 55
55gnunet_service_consensus_SOURCES = \ 56gnunet_service_consensus_SOURCES = \
56 gnunet-service-consensus.c 57 gnunet-service-consensus.c \
58 ibf.c
57gnunet_service_consensus_LDADD = \ 59gnunet_service_consensus_LDADD = \
58 $(top_builddir)/src/util/libgnunetutil.la \ 60 $(top_builddir)/src/util/libgnunetutil.la \
59 $(top_builddir)/src/core/libgnunetcore.la \ 61 $(top_builddir)/src/core/libgnunetcore.la \
62 $(top_builddir)/src/stream/libgnunetstream.la \
60 $(top_builddir)/src/mesh/libgnunetmesh.la \ 63 $(top_builddir)/src/mesh/libgnunetmesh.la \
61 $(GN_LIBINTL) 64 $(GN_LIBINTL)
62 65
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h
index 75b90b0f9..8436364b6 100644
--- a/src/consensus/consensus.h
+++ b/src/consensus/consensus.h
@@ -71,6 +71,10 @@ struct GNUNET_CONSENSUS_ConcludeDoneMessage
71 */ 71 */
72 struct GNUNET_MessageHeader header; 72 struct GNUNET_MessageHeader header;
73 73
74 uint32_t group_id;
75
76 uint32_t num_elements;
77
74 uint16_t num_peers; 78 uint16_t num_peers;
75 79
76 /** PeerIdentity[num_peers] */ 80 /** PeerIdentity[num_peers] */
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 25c76b358..5c0494254 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -54,11 +54,6 @@ struct QueuedMessage
54 struct GNUNET_MessageHeader *msg; 54 struct GNUNET_MessageHeader *msg;
55 55
56 /** 56 /**
57 * Size of the message in msg.
58 */
59 size_t size;
60
61 /**
62 * Will be called after transmit, if not NULL 57 * Will be called after transmit, if not NULL
63 */ 58 */
64 GNUNET_CONSENSUS_InsertDoneCallback idc; 59 GNUNET_CONSENSUS_InsertDoneCallback idc;
@@ -154,7 +149,7 @@ struct GNUNET_CONSENSUS_Handle
154 * @param consensus consensus handle 149 * @param consensus consensus handle
155 */ 150 */
156static void 151static void
157schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus); 152send_next (struct GNUNET_CONSENSUS_Handle *consensus);
158 153
159 154
160/** 155/**
@@ -168,16 +163,17 @@ schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus);
168 * @param buf where the callee should write the message 163 * @param buf where the callee should write the message
169 * @return number of bytes written to buf 164 * @return number of bytes written to buf
170 */ 165 */
171static size_t transmit_queued (void *cls, size_t size, 166static size_t
172 void *buf) 167transmit_queued (void *cls, size_t size,
168 void *buf)
173{ 169{
174 struct GNUNET_CONSENSUS_Handle *consensus; 170 struct GNUNET_CONSENSUS_Handle *consensus;
175 struct QueuedMessage *qmsg; 171 struct QueuedMessage *qmsg;
176 size_t ret_size; 172 size_t msg_size;
177
178 printf("transmitting queued\n");
179 173
180 consensus = (struct GNUNET_CONSENSUS_Handle *) cls; 174 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
175 consensus->th = NULL;
176
181 qmsg = consensus->messages_head; 177 qmsg = consensus->messages_head;
182 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); 178 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
183 GNUNET_assert (qmsg); 179 GNUNET_assert (qmsg);
@@ -188,10 +184,14 @@ static size_t transmit_queued (void *cls, size_t size,
188 { 184 {
189 qmsg->idc (qmsg->idc_cls, GNUNET_YES); 185 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
190 } 186 }
187 return 0;
191 } 188 }
192 189
193 memcpy (buf, qmsg->msg, qmsg->size); 190 msg_size = ntohs (qmsg->msg->size);
194 ret_size = qmsg->size; 191
192 GNUNET_assert (size >= msg_size);
193
194 memcpy (buf, qmsg->msg, msg_size);
195 if (NULL != qmsg->idc) 195 if (NULL != qmsg->idc)
196 { 196 {
197 qmsg->idc (qmsg->idc_cls, GNUNET_YES); 197 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
@@ -199,9 +199,9 @@ static size_t transmit_queued (void *cls, size_t size,
199 GNUNET_free (qmsg->msg); 199 GNUNET_free (qmsg->msg);
200 GNUNET_free (qmsg); 200 GNUNET_free (qmsg);
201 201
202 schedule_transmit (consensus); 202 send_next (consensus);
203 203
204 return ret_size; 204 return msg_size;
205} 205}
206 206
207 207
@@ -211,7 +211,7 @@ static size_t transmit_queued (void *cls, size_t size,
211 * @param consensus consensus handle 211 * @param consensus consensus handle
212 */ 212 */
213static void 213static void
214schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus) 214send_next (struct GNUNET_CONSENSUS_Handle *consensus)
215{ 215{
216 if (NULL != consensus->th) 216 if (NULL != consensus->th)
217 return; 217 return;
@@ -219,9 +219,10 @@ schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus)
219 if (NULL != consensus->messages_head) 219 if (NULL != consensus->messages_head)
220 { 220 {
221 LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); 221 LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
222 GNUNET_CLIENT_notify_transmit_ready (consensus->client, consensus->messages_head->size, 222 consensus->th =
223 GNUNET_TIME_UNIT_FOREVER_REL, 223 GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
224 GNUNET_NO, &transmit_queued, consensus); 224 GNUNET_TIME_UNIT_FOREVER_REL,
225 GNUNET_NO, &transmit_queued, consensus);
225 } 226 }
226} 227}
227 228
@@ -270,8 +271,7 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
270 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) 271 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
271{ 272{
272 GNUNET_assert (NULL != consensus->conclude_cb); 273 GNUNET_assert (NULL != consensus->conclude_cb);
273 consensus->conclude_cb (consensus->conclude_cls, 274 consensus->conclude_cb (consensus->conclude_cls, NULL);
274 0, NULL);
275 consensus->conclude_cb = NULL; 275 consensus->conclude_cb = NULL;
276} 276}
277 277
@@ -356,7 +356,7 @@ transmit_join (void *cls, size_t size, void *buf)
356 consensus->peers, 356 consensus->peers,
357 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); 357 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
358 358
359 schedule_transmit (consensus); 359 send_next (consensus);
360 360
361 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, 361 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
362 GNUNET_TIME_UNIT_FOREVER_REL); 362 GNUNET_TIME_UNIT_FOREVER_REL);
@@ -454,13 +454,12 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
454 454
455 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); 455 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
456 qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; 456 qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
457 qmsg->size = element_msg_size;
458 qmsg->idc = idc; 457 qmsg->idc = idc;
459 qmsg->idc_cls = idc_cls; 458 qmsg->idc_cls = idc_cls;
460 459
461 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); 460 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
462 461
463 schedule_transmit (consensus); 462 send_next (consensus);
464} 463}
465 464
466 465
@@ -500,11 +499,10 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
500 499
501 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); 500 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
502 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; 501 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
503 qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
504 502
505 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); 503 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
506 504
507 schedule_transmit (consensus); 505 send_next (consensus);
508} 506}
509 507
510 508
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
new file mode 100644
index 000000000..105708ee9
--- /dev/null
+++ b/src/consensus/consensus_protocol.h
@@ -0,0 +1,71 @@
1/*
2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21
22/**
23 * @file consensus/consensus_protocol.h
24 * @brief p2p message definitions for consensus
25 * @author Florian Dold
26 */
27
28#ifndef GNUNET_CONSENSUS_PROTOCOL_H
29#define GNUNET_CONSENSUS_PROTOCOL_H
30
31#include "platform.h"
32#include "gnunet_common.h"
33#include "gnunet_protocols.h"
34
35
36GNUNET_NETWORK_STRUCT_BEGIN
37
38struct StrataMessage
39{
40 struct GNUNET_MessageHeader header;
41 /**
42 * Number of strata in this estimator.
43 */
44 uint16_t num_strata;
45 /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */
46 /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */
47 /* uint8_t count_buckets[ibf_size*num_strata] */
48};
49
50struct DifferenceDigest
51{
52
53 struct GNUNET_MessageHeader header;
54};
55
56struct Element
57{
58 struct GNUNET_MessageHeader header;
59};
60
61struct ConsensusHello
62{
63 struct GNUNET_MessageHeader header;
64 struct GNUNET_HashCode global_id;
65 uint8_t round;
66};
67
68
69GNUNET_NETWORK_STRUCT_END
70
71#endif
diff --git a/src/consensus/gnunet-consensus-start-peers.c b/src/consensus/gnunet-consensus-start-peers.c
index 8b516393f..fb7f047ae 100644
--- a/src/consensus/gnunet-consensus-start-peers.c
+++ b/src/consensus/gnunet-consensus-start-peers.c
@@ -147,9 +147,6 @@ run (void *cls, char *const *args, const char *cfgfile,
147 NULL, 147 NULL,
148 test_master, 148 test_master,
149 NULL); 149 NULL);
150
151
152 printf("hello there!\n");
153} 150}
154 151
155 152
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index cd267f5ec..c8a5593f1 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -20,210 +20,283 @@
20 20
21/** 21/**
22 * @file consensus/gnunet-consensus.c 22 * @file consensus/gnunet-consensus.c
23 * @brief 23 * @brief profiling tool for gnunet-consensus
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_common.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
28#include "gnunet_consensus_service.h" 29#include "gnunet_consensus_service.h"
30#include "gnunet_testbed_service.h"
29 31
32static unsigned int num_peers = 2;
30 33
34static unsigned int replication = 1;
31 35
32/** 36static unsigned int num_values = 5;
33 * Handle to the consensus service
34 */
35static struct GNUNET_CONSENSUS_Handle *consensus;
36/**
37 * Session id
38 */
39static char *session_id_str;
40 37
41/** 38static struct GNUNET_TIME_Relative conclude_timeout;
42 * File handle to STDIN
43 */
44static struct GNUNET_DISK_FileHandle *stdin_fh;
45 39
46/** 40static struct GNUNET_CONSENSUS_Handle **consensus_handles;
47 * Task for reading from stdin 41
48 */ 42static unsigned int num_connected_handles;
49static GNUNET_SCHEDULER_TaskIdentifier stdin_tid = GNUNET_SCHEDULER_NO_TASK; 43
44static struct GNUNET_TESTBED_Peer **peers;
45
46static struct GNUNET_PeerIdentity *peer_ids;
50 47
48static unsigned int num_retrieved_peer_ids;
51 49
50static struct GNUNET_HashCode session_id;
51
52
53/**
54 * Signature of the event handler function called by the
55 * respective event controller.
56 *
57 * @param cls closure
58 * @param event information about the event
59 */
52static void 60static void
53stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 61controller_cb(void *cls,
62 const struct GNUNET_TESTBED_EventInformation *event)
63{
64 GNUNET_assert (0);
65}
54 66
55 67
56/** 68/**
57 * Called when a conclusion was successful. 69 * Called when a conclusion was successful.
58 * 70 *
59 * @param cls 71 * @param cls
60 * @param num_peers_in_consensus 72 * @param group
61 * @param peers_in_consensus 73 * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not
62 */ 74 */
75static int
76conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group)
77{
78 return GNUNET_NO;
79}
80
81
82
63static void 83static void
64conclude_cb (void *cls, 84generate_indices (int *indices)
65 unsigned int consensus_group_count,
66 const struct GNUNET_CONSENSUS_Group *groups)
67{ 85{
68 printf("reached conclusion\n"); 86 int j;
69 GNUNET_SCHEDULER_shutdown (); 87 j = 0;
88 while (j < replication)
89 {
90 int n;
91 int k;
92 int repeat;
93 n = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
94 repeat = GNUNET_NO;
95 for (k = 0; k < j; k++)
96 if (indices[k] == n)
97 {
98 repeat = GNUNET_YES;
99 break;
100 }
101 if (GNUNET_NO == repeat)
102 indices[j++] = n;
103 }
70} 104}
71 105
72 106
73static void 107static void
74insert_done_cb (void *cls, 108do_consensus ()
75 int success)
76{ 109{
77 struct GNUNET_CONSENSUS_Element *element = cls; 110 int unique_indices[replication];
111 int i;
78 112
79 GNUNET_free (element); 113 for (i = 0; i < num_values; i++)
80 if (GNUNET_YES != success)
81 { 114 {
82 printf ("insert failed\n"); 115 int j;
83 GNUNET_SCHEDULER_shutdown (); 116 struct GNUNET_HashCode *val;
84 return; 117 struct GNUNET_CONSENSUS_Element *element;
118 generate_indices(unique_indices);
119
120 val = GNUNET_malloc (sizeof *val);
121 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, val);
122
123 element = GNUNET_malloc (sizeof *element);
124 element->data = val;
125 element->size = sizeof *val;
126
127 for (j = 0; j < replication; j++)
128 {
129 int cid;
130 cid = unique_indices[j];
131 GNUNET_CONSENSUS_insert (consensus_handles[cid], element, NULL, NULL);
132 }
85 } 133 }
86 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == stdin_tid); 134
87 stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh, 135 for (i = 0; i < num_peers; i++)
88 &stdin_cb, NULL); 136 GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, 0, conclude_cb, consensus_handles[i]);
89} 137}
90 138
91 139
92/** 140/**
93 * Called whenever we can read stdin non-blocking 141 * Callback to be called when a service connect operation is completed
94 * 142 *
95 * @param cls unused 143 * @param cls the callback closure from functions generating an operation
96 * @param tc scheduler context 144 * @param op the operation that has been finished
145 * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
146 * @param emsg error message in case the operation has failed; will be NULL if
147 * operation has executed successfully.
97 */ 148 */
98static void 149static void
99stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 150connect_complete (void *cls,
151 struct GNUNET_TESTBED_Operation *op,
152 void *ca_result,
153 const char *emsg)
100{ 154{
101 char buf[1024]; 155 struct GNUNET_CONSENSUS_Handle **chp;
102 char *ret; 156
103 struct GNUNET_CONSENSUS_Element *element; 157 if (NULL != emsg)
104
105 stdin_tid = GNUNET_SCHEDULER_NO_TASK;
106 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
107 return; /* we're done here */
108 ret = fgets (buf, 1024, stdin);
109 if (NULL == ret)
110 { 158 {
111 if (feof (stdin)) 159 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "testbed connect emsg: %s\n", emsg);
112 { 160 GNUNET_assert (0);
113 printf ("concluding ...\n");
114 GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 0, conclude_cb, NULL);
115 }
116 return;
117 } 161 }
118 162
119 printf("read: %s", buf); 163 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "connect complete\n");
164
165 chp = (struct GNUNET_CONSENSUS_Handle **) cls;
166 *chp = (struct GNUNET_CONSENSUS_Handle *) ca_result;
167 num_connected_handles++;
120 168
121 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + strlen(buf) + 1); 169 if (num_connected_handles == num_peers)
122 element->type = 0; 170 {
123 element->size = strlen(buf) + 1; 171 do_consensus ();
124 element->data = &element[1]; 172 }
125 strcpy ((char *) &element[1], buf); 173}
126 GNUNET_CONSENSUS_insert (consensus, element, &insert_done_cb, element); 174
175
176static int
177new_element_cb (void *cls,
178 struct GNUNET_CONSENSUS_Element *element)
179{
180 return GNUNET_YES;
127} 181}
128 182
129 183
130/** 184/**
131 * Called when a new element was received from another peer, or an error occured. 185 * Adapter function called to establish a connection to
132 * 186 * a service.
133 * May deliver duplicate values.
134 *
135 * Elements given to a consensus operation by the local peer are NOT given
136 * to this callback.
137 * 187 *
138 * @param cls closure 188 * @param cls closure
139 * @param element new element, NULL on error 189 * @param cfg configuration of the peer to connect to; will be available until
140 * @return GNUNET_OK if the valid is well-formed and should be added to the consensus, 190 * GNUNET_TESTBED_operation_done() is called on the operation returned
141 * GNUNET_SYSERR if the element should be ignored and not be propagated 191 * from GNUNET_TESTBED_service_connect()
192 * @return service handle to return in 'op_result', NULL on error
142 */ 193 */
143static int 194static void *
144cb (void *cls, 195connect_adapter (void *cls,
145 struct GNUNET_CONSENSUS_Element *element) 196 const struct GNUNET_CONFIGURATION_Handle *cfg)
146{ 197{
147 if (NULL == element) 198 struct GNUNET_CONSENSUS_Handle *consensus;
148 { 199 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "connect adapter, %d peers\n", num_peers);
149 printf("error receiving from consensus\n"); 200 consensus = GNUNET_CONSENSUS_create (cfg, num_peers, peer_ids, &session_id, new_element_cb, NULL);
150 GNUNET_SCHEDULER_shutdown (); 201 GNUNET_assert (NULL != consensus);
151 return GNUNET_NO; 202 return consensus;
152 }
153 printf("got element\n");
154 return GNUNET_YES;
155} 203}
156 204
157 205
158/** 206/**
159 * Function run on shutdown to clean up. 207 * Adapter function called to destroy a connection to
208 * a service.
160 * 209 *
161 * @param cls the statistics handle 210 * @param cls closure
162 * @param tc scheduler context 211 * @param op_result service handle returned from the connect adapter
163 */ 212 */
164static void 213static void
165shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 214disconnect_adapter(void *cls, void *op_result)
166{ 215{
167 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n"); 216 /* FIXME: what to do here? */
168 if (NULL != consensus)
169 {
170 GNUNET_CONSENSUS_destroy (consensus);
171 consensus = NULL;
172 }
173} 217}
174 218
175 219
220/**
221 * Callback to be called when the requested peer information is available
222 *
223 * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
224 * @param op the operation this callback corresponds to
225 * @param pinfo the result; will be NULL if the operation has failed
226 * @param emsg error message if the operation has failed; will be NULL if the
227 * operation is successfull
228 */
176static void 229static void
177run (void *cls, char *const *args, const char *cfgfile, 230peer_info_cb (void *cb_cls,
178 const struct GNUNET_CONFIGURATION_Handle *cfg) 231 struct GNUNET_TESTBED_Operation *op,
232 const struct GNUNET_TESTBED_PeerInformation *pinfo,
233 const char *emsg)
179{ 234{
180 struct GNUNET_HashCode sid; 235 struct GNUNET_PeerIdentity *p;
181 struct GNUNET_PeerIdentity *pids;
182 int count;
183 int i; 236 int i;
184 237
185 if (NULL == session_id_str) 238 GNUNET_assert (NULL == emsg);
186 {
187 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given (missing -s/--session-id)\n");
188 return;
189 }
190 239
191 GNUNET_CRYPTO_hash (session_id_str, strlen (session_id_str), &sid); 240 p = (struct GNUNET_PeerIdentity *) cb_cls;
192 241
193 for (count = 0; NULL != args[count]; count++); 242 if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY)
194 243 {
195 if (0 != count) 244 *p = *pinfo->result.id;
196 { 245 num_retrieved_peer_ids++;
197 pids = GNUNET_malloc (count * sizeof (struct GNUNET_PeerIdentity)); 246 if (num_retrieved_peer_ids == num_peers)
247 for (i = 0; i < num_peers; i++)
248 GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", connect_complete, &consensus_handles[i],
249 connect_adapter, disconnect_adapter, NULL);
198 } 250 }
199 else 251 else
200 { 252 {
201 pids = NULL; 253 GNUNET_assert (0);
202 } 254 }
255}
203 256
204 for (i = 0; i < count; i++)
205 {
206 int ret;
207 ret = GNUNET_CRYPTO_hash_from_string (args[i], &pids[i].hashPubKey);
208 if (GNUNET_OK != ret)
209 {
210 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "peer identity '%s' is malformed\n", args[i]);
211 return;
212 }
213 }
214 257
215 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, 258static void
216 &shutdown_task, NULL); 259test_master (void *cls,
217 260 unsigned int num_peers,
218 consensus = 261 struct GNUNET_TESTBED_Peer **started_peers)
219 GNUNET_CONSENSUS_create (cfg, 262{
220 count, pids, 263 int i;
221 &sid, 264
222 &cb, NULL); 265
223 266 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
224 stdin_fh = GNUNET_DISK_get_handle_from_native (stdin); 267
225 stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh, 268 peers = started_peers;
226 &stdin_cb, NULL); 269
270 peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
271
272 consensus_handles = GNUNET_malloc (num_peers * sizeof (struct ConsensusHandle *));
273
274 for (i = 0; i < num_peers; i++)
275 GNUNET_TESTBED_peer_get_information (peers[i],
276 GNUNET_TESTBED_PIT_IDENTITY,
277 peer_info_cb,
278 &peer_ids[i]);
279}
280
281static void
282run (void *cls, char *const *args, const char *cfgfile,
283 const struct GNUNET_CONFIGURATION_Handle *cfg)
284{
285 static char *session_str = "gnunet-consensus/test";
286
287
288 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "running gnunet-consensus\n");
289
290 GNUNET_CRYPTO_hash (session_str, strlen(session_str), &session_id);
291
292 (void) GNUNET_TESTBED_test_run ("gnunet-consensus",
293 cfgfile,
294 num_peers,
295 0,
296 controller_cb,
297 NULL,
298 test_master,
299 NULL);
227} 300}
228 301
229 302
@@ -231,13 +304,24 @@ int
231main (int argc, char **argv) 304main (int argc, char **argv)
232{ 305{
233 static const struct GNUNET_GETOPT_CommandLineOption options[] = { 306 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
234 { 's', "session-id", "ID", 307 { 'n', "num-peers", NULL,
235 gettext_noop ("session identifier"), 308 gettext_noop ("number of peers in consensus"),
236 GNUNET_YES, &GNUNET_GETOPT_set_string, &session_id_str }, 309 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers },
237 GNUNET_GETOPT_OPTION_END 310 { 'k', "value-replication", NULL,
238 }; 311 gettext_noop ("how many peers receive one value?"),
239 GNUNET_PROGRAM_run (argc, argv, "gnunet-consensus", 312 GNUNET_YES, &GNUNET_GETOPT_set_uint, &replication },
313 { 'x', "num-values", NULL,
314 gettext_noop ("number of values"),
315 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_values },
316 { 't', "timeout", NULL,
317 gettext_noop ("consensus timeout"),
318 GNUNET_YES, &GNUNET_GETOPT_set_relative_time, &conclude_timeout },
319 GNUNET_GETOPT_OPTION_END
320 };
321 conclude_timeout = GNUNET_TIME_UNIT_SECONDS;
322 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus",
240 "help", 323 "help",
241 options, &run, NULL); 324 options, &run, NULL, GNUNET_YES);
242 return 0; 325 return 0;
243} 326}
327
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 1b394db19..ad0266954 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -32,15 +32,45 @@
32#include "gnunet_util_lib.h" 32#include "gnunet_util_lib.h"
33#include "gnunet_consensus_service.h" 33#include "gnunet_consensus_service.h"
34#include "gnunet_core_service.h" 34#include "gnunet_core_service.h"
35#include "gnunet_mesh_service.h" 35#include "gnunet_stream_lib.h"
36#include "consensus_protocol.h"
37#include "ibf.h"
36#include "consensus.h" 38#include "consensus.h"
37 39
38 40
41/**
42 * Number of IBFs in a strata estimator.
43 */
44#define STRATA_COUNT 32
45/**
46 * Number of buckets per IBF.
47 */
48#define STRATA_IBF_BUCKETS 80
49/**
50 * hash num parameter of the IBF
51 */
52#define STRATA_HASH_NUM 3
53/**
54 * Number of strata that can be transmitted in one message.
55 */
56#define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
57
58
59
60/* forward declarations */
61
39struct ConsensusSession; 62struct ConsensusSession;
63struct IncomingSocket;
40 64
41static void 65static void
42send_next (struct ConsensusSession *session); 66send_next (struct ConsensusSession *session);
43 67
68static void
69write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
70
71static int
72get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
73
44 74
45/** 75/**
46 * An element that is waiting to be transmitted to a client. 76 * An element that is waiting to be transmitted to a client.
@@ -63,22 +93,75 @@ struct PendingElement
63 struct GNUNET_CONSENSUS_Element *element; 93 struct GNUNET_CONSENSUS_Element *element;
64}; 94};
65 95
66 96struct ConsensusPeerInformation
67/*
68 * A peer that is also in a consensus session.
69 * Note that 'this' peer is not in the list.
70 */
71struct ConsensusPeer
72{ 97{
73 struct GNUNET_PeerIdentity *peer_id; 98 struct GNUNET_STREAM_Socket *socket;
99
100 /**
101 * Is socket's connection established, i.e. can we write to it?
102 * Only relevent on outgoing cpi.
103 */
104 int is_connected;
105
106 /**
107 * Type of the peer in the all-to-all rounds,
108 * GNUNET_YES if we initiate reconciliation.
109 */
110 int is_outgoing;
111
112 /**
113 * Did we receive/send a consensus hello?
114 */
115 int hello;
116
117 /**
118 * Handle for currently active read
119 */
120 struct GNUNET_STREAM_ReadHandle *rh;
121
122 /**
123 * Handle for currently active read
124 */
125 struct GNUNET_STREAM_WriteHandle *wh;
126
127 /**
128 * How many of the strate in the ibf were
129 * sent or received in this round?
130 */
131 int strata_counter;
132
133 struct InvertibleBloomFilter *my_ibf;
134
135 int my_ibf_bucket_counter;
136
137 struct InvertibleBloomFilter *peer_ibf;
138
139 int peer_ibf_bucket_counter;
74 140
75 /** 141 /**
76 * Incoming tunnel from the peer. 142 * Strata estimator of the peer, NULL if our peer
143 * initiated the reconciliation.
77 */ 144 */
78 struct GNUNET_MESH_Tunnel *incoming_tunnel; 145 struct InvertibleBloomFilter **strata;
79 146
80 struct InvertibleBloomFilter *last_ibf; 147 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
148
149 struct ConsensusSession *session;
150};
151
152struct QueuedMessage
153{
154 struct GNUNET_MessageHeader *msg;
155
156 /**
157 * Queued messages are stored in a doubly linked list.
158 */
159 struct QueuedMessage *next;
81 160
161 /**
162 * Queued messages are stored in a doubly linked list.
163 */
164 struct QueuedMessage *prev;
82}; 165};
83 166
84 167
@@ -98,15 +181,17 @@ struct ConsensusSession
98 struct ConsensusSession *prev; 181 struct ConsensusSession *prev;
99 182
100 /** 183 /**
101 * Local consensus identification, chosen by clients. 184 * Join message. Used to initialize the session later,
185 * if the identity of the local peer is not yet known.
186 * NULL if the session has been fully initialized.
102 */ 187 */
103 struct GNUNET_HashCode *local_id; 188 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
104 189
105 /** 190 /**
106 * Global consensus identification, computed 191 * Global consensus identification, computed
107 * from the local id and participating authorities. 192 * from the local id and participating authorities.
108 */ 193 */
109 struct GNUNET_HashCode *global_id; 194 struct GNUNET_HashCode global_id;
110 195
111 /** 196 /**
112 * Local client in this consensus session. 197 * Local client in this consensus session.
@@ -140,6 +225,10 @@ struct ConsensusSession
140 */ 225 */
141 struct PendingElement *approval_pending_tail; 226 struct PendingElement *approval_pending_tail;
142 227
228 struct QueuedMessage *client_messages_head;
229
230 struct QueuedMessage *client_messages_tail;
231
143 /** 232 /**
144 * Currently active transmit handle for sending to the client 233 * Currently active transmit handle for sending to the client
145 */ 234 */
@@ -152,11 +241,6 @@ struct ConsensusSession
152 int conclude_requested; 241 int conclude_requested;
153 242
154 /** 243 /**
155 * Client has been informed about the conclusion.
156 */
157 int conclude_sent;
158
159 /**
160 * Minimum number of peers to form a consensus group 244 * Minimum number of peers to form a consensus group
161 */ 245 */
162 int conclude_group_min; 246 int conclude_group_min;
@@ -178,30 +262,74 @@ struct ConsensusSession
178 */ 262 */
179 unsigned int num_peers; 263 unsigned int num_peers;
180 264
181 /** 265 struct ConsensusPeerInformation *info;
182 * Other peers in the consensus, array of ConsensusPeer
183 */
184 struct ConsensusPeer *peers;
185 266
186 /** 267 /**
187 * Tunnel for broadcasting to all other authorities 268 * Sorted array of peer identities in this consensus session,
269 * includes the local peer.
188 */ 270 */
189 struct GNUNET_MESH_Tunnel *broadcast_tunnel; 271 struct GNUNET_PeerIdentity *peers;
190 272
191 /** 273 /**
192 * Time limit for one round of pairwise exchange. 274 * Index of the local peer in the peers array
193 * FIXME: should not actually be a constant
194 */ 275 */
195 struct GNUNET_TIME_Relative round_time; 276 int local_peer_idx;
196 277
197 /** 278 /**
198 * Task identifier for the round timeout task 279 * Task identifier for the round timeout task
199 */ 280 */
200 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; 281 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
282
283 struct InvertibleBloomFilter **strata;
201}; 284};
202 285
203 286
204/** 287/**
288 * Sockets from other peers who want to communicate with us.
289 * It may not be known yet which consensus session they belong to.
290 */
291struct IncomingSocket
292{
293 /**
294 * Incoming sockets are kept in a double linked list.
295 */
296 struct IncomingSocket *next;
297
298 /**
299 * Incoming sockets are kept in a double linked list.
300 */
301 struct IncomingSocket *prev;
302
303 /**
304 * The actual socket.
305 */
306 struct GNUNET_STREAM_Socket *socket;
307
308 /**
309 * Handle for currently active read
310 */
311 struct GNUNET_STREAM_ReadHandle *rh;
312
313 /**
314 * Peer that connected to us with the socket.
315 */
316 struct GNUNET_PeerIdentity *peer;
317
318 /**
319 * Message stream tokenizer for this socket.
320 */
321 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
322
323 /**
324 * Peer-in-session this socket belongs to, once known, otherwise NULL.
325 */
326 struct ConsensusPeerInformation *cpi;
327};
328
329static struct IncomingSocket *incoming_sockets_head;
330static struct IncomingSocket *incoming_sockets_tail;
331
332/**
205 * Linked list of sesstions this peer participates in. 333 * Linked list of sesstions this peer participates in.
206 */ 334 */
207static struct ConsensusSession *sessions_head; 335static struct ConsensusSession *sessions_head;
@@ -222,32 +350,349 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
222static struct GNUNET_SERVER_Handle *srv; 350static struct GNUNET_SERVER_Handle *srv;
223 351
224/** 352/**
225 * Peer that runs this service 353 * Peer that runs this service.
226 */ 354 */
227static struct GNUNET_PeerIdentity *my_peer; 355static struct GNUNET_PeerIdentity *my_peer;
228 356
229/** 357/**
230 * Handle to the mesh service. 358 * Handle to the core service. Only used during service startup, will be NULL after that.
231 */ 359 */
232static struct GNUNET_MESH_Handle *mesh; 360static struct GNUNET_CORE_Handle *core;
233 361
234/** 362/**
235 * Handle to the core service. Only used during service startup, will be NULL after that. 363 * Listener for sockets from peers that want to reconcile with us.
236 */ 364 */
237static struct GNUNET_CORE_Handle *core; 365static struct GNUNET_STREAM_ListenSocket *listener;
238 366
367
368static int
369estimate_difference (struct InvertibleBloomFilter** strata1,
370 struct InvertibleBloomFilter** strata2)
371{
372 int i;
373 int count;
374 count = 0;
375 for (i = STRATA_COUNT - 1; i >= 0; i--)
376 {
377 struct InvertibleBloomFilter *diff;
378 int ibf_count;
379 int more;
380 ibf_count = 0;
381 diff = ibf_dup (strata1[i]);
382 ibf_subtract (diff, strata2[i]);
383 for (;;)
384 {
385 more = ibf_decode (diff, NULL, NULL);
386 if (GNUNET_NO == more)
387 {
388 count += ibf_count;
389 break;
390 }
391 if (GNUNET_SYSERR == more)
392 {
393 return count * (1 << (i + 1));
394 }
395 ibf_count++;
396 }
397 ibf_destroy (diff);
398 }
399 return count;
400}
401
402
403/**
404 * Functions of this signature are called whenever data is available from the
405 * stream.
406 *
407 * @param cls the closure from GNUNET_STREAM_read
408 * @param status the status of the stream at the time this function is called
409 * @param data traffic from the other side
410 * @param size the number of bytes available in data read; will be 0 on timeout
411 * @return number of bytes of processed from 'data' (any data remaining should be
412 * given to the next time the read processor is called).
413 */
414static size_t
415stream_data_processor (void *cls,
416 enum GNUNET_STREAM_Status status,
417 const void *data,
418 size_t size)
419{
420 struct IncomingSocket *incoming;
421 int ret;
422
423 GNUNET_assert (GNUNET_STREAM_OK == status);
424
425 incoming = (struct IncomingSocket *) cls;
426
427 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_NO);
428 if (GNUNET_SYSERR == ret)
429 {
430 /* FIXME: handle this correctly */
431 GNUNET_assert (0);
432 }
433
434 /* read again */
435 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
436 &stream_data_processor, incoming);
437
438 /* we always read all data */
439 return size;
440}
441
442static int
443handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
444{
445 int i;
446 int num_strata;
447 struct GNUNET_HashCode *hash_src;
448 uint8_t *count_src;
449
450 GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
451
452 if (NULL == cpi->strata)
453 {
454 cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
455 for (i = 0; i < STRATA_COUNT; i++)
456 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
457 }
458
459 num_strata = ntohs (strata_msg->num_strata);
460
461 /* for correct message alignment, copy bucket types seperately */
462 hash_src = (struct GNUNET_HashCode *) &strata_msg[1];
463
464 for (i = 0; i < num_strata; i++)
465 {
466 memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
467 hash_src += STRATA_IBF_BUCKETS;
468 }
469
470 for (i = 0; i < num_strata; i++)
471 {
472 memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
473 hash_src += STRATA_IBF_BUCKETS;
474 }
475
476 count_src = (uint8_t *) hash_src;
477
478 for (i = 0; i < num_strata; i++)
479 {
480 uint8_t zero[STRATA_IBF_BUCKETS];
481 memset (zero, 0, STRATA_IBF_BUCKETS);
482 memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS);
483 count_src += STRATA_IBF_BUCKETS;
484 }
485
486 GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE));
487
488 cpi->strata_counter += num_strata;
489
490 if (STRATA_COUNT == cpi->strata_counter)
491 {
492 int diff;
493 diff = estimate_difference (cpi->session->strata, cpi->strata);
494 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff);
495 }
496
497 return GNUNET_YES;
498}
499
500
501static int
502handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *strata)
503{
504 return GNUNET_YES;
505}
506
507
508static int
509handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element *strata)
510{
511 return GNUNET_YES;
512}
513
514
515static int
516handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
517{
518 struct ConsensusSession *session;
519 session = sessions_head;
520 while (NULL != session)
521 {
522 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
523 {
524 int idx;
525 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
526 idx = get_peer_idx (inc->peer, session);
527 GNUNET_assert (-1 != idx);
528 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
529 inc->cpi = &session->info[idx];
530 GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
531 inc->cpi->mst = inc->mst;
532 inc->cpi->hello = GNUNET_YES;
533 inc->cpi->socket = inc->socket;
534 return GNUNET_YES;
535 }
536 session = session->next;
537 }
538 GNUNET_assert (0);
539 return GNUNET_NO;
540}
541
542
543/**
544 * Functions with this signature are called whenever a
545 * complete message is received by the tokenizer.
546 *
547 * Do not call GNUNET_SERVER_mst_destroy in callback
548 *
549 * @param cls closure
550 * @param client identification of the client
551 * @param message the actual message
552 *
553 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
554 */
555static int
556mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
557{
558 struct ConsensusPeerInformation *cpi;
559 cpi = (struct ConsensusPeerInformation *) cls;
560 switch (ntohs( message->type))
561 {
562 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
563 return handle_p2p_strata (cpi, (struct StrataMessage *) message);
564 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
565 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
566 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
567 return handle_p2p_element (cpi, (struct Element *) message);
568 default:
569 /* FIXME: handle correctly */
570 GNUNET_assert (0);
571 }
572 return GNUNET_OK;
573}
574
575
576/**
577 * Handle tokenized messages from stream sockets.
578 * Delegate them if the socket belongs to a session,
579 * handle hello messages otherwise.
580 *
581 * Do not call GNUNET_SERVER_mst_destroy in callback
582 *
583 * @param cls closure, unused
584 * @param client incoming socket this message comes from
585 * @param message the actual message
586 *
587 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
588 */
589static int
590mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
591{
592 struct IncomingSocket *inc;
593 inc = (struct IncomingSocket *) client;
594 switch (ntohs( message->type))
595 {
596 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
597 return handle_p2p_hello (inc, (struct ConsensusHello *) message);
598 default:
599 if (NULL != inc->cpi)
600 return mst_session_callback (inc->cpi, client, message);
601 /* FIXME: disconnect peer properly */
602 GNUNET_assert (0);
603 }
604 return GNUNET_OK;
605}
606
607
608/**
609 * Functions of this type are called upon new stream connection from other peers
610 * or upon binding error which happen when the app_port given in
611 * GNUNET_STREAM_listen() is already taken.
612 *
613 * @param cls the closure from GNUNET_STREAM_listen
614 * @param socket the socket representing the stream; NULL on binding error
615 * @param initiator the identity of the peer who wants to establish a stream
616 * with us; NULL on binding error
617 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
618 * stream (the socket will be invalid after the call)
619 */
620static int
621listen_cb (void *cls,
622 struct GNUNET_STREAM_Socket *socket,
623 const struct GNUNET_PeerIdentity *initiator)
624{
625 struct IncomingSocket *incoming;
626
627 GNUNET_assert (NULL != socket);
628
629 incoming = GNUNET_malloc (sizeof *incoming);
630
631 incoming->socket = socket;
632 incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
633
634 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
635 &stream_data_processor, incoming);
636
637
638 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
639
640 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
641
642 return GNUNET_OK;
643}
644
645
646static void
647destroy_session (struct ConsensusSession *session)
648{
649 /* FIXME: more stuff to free! */
650 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
651 GNUNET_SERVER_client_drop (session->client);
652 GNUNET_free (session);
653}
654
655
656/**
657 * Disconnect a client, and destroy all sessions associated with it.
658 *
659 * @param client the client to disconnect
660 */
239static void 661static void
240disconnect_client (struct GNUNET_SERVER_Client *client) 662disconnect_client (struct GNUNET_SERVER_Client *client)
241{ 663{
664 struct ConsensusSession *session;
242 GNUNET_SERVER_client_disconnect (client); 665 GNUNET_SERVER_client_disconnect (client);
243 /* FIXME: free data structures that this client owns */ 666
667 /* if the client owns a session, remove it */
668 session = sessions_head;
669 while (NULL != session)
670 {
671 if (client == session->client)
672 {
673 destroy_session (session);
674 break;
675 }
676 session = session->next;
677 }
244} 678}
245 679
680
681/**
682 * Compute a global, (hopefully) unique consensus session id,
683 * from the local id of the consensus session, and the identities of all participants.
684 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
685 * exactly the same peers, the global id will be different.
686 *
687 * @param local_id local id of the consensus session
688 * @param peers array of all peers participating in the consensus session
689 * @param num_peers number of elements in the peers array
690 * @param dst where the result is stored, may not be NULL
691 */
246static void 692static void
247compute_global_id (struct GNUNET_HashCode *dst, 693compute_global_id (const struct GNUNET_HashCode *local_id,
248 const struct GNUNET_HashCode *local_id, 694 const struct GNUNET_PeerIdentity *peers, int num_peers,
249 const struct GNUNET_PeerIdentity *peers, 695 struct GNUNET_HashCode *dst)
250 int num_peers)
251{ 696{
252 int i; 697 int i;
253 struct GNUNET_HashCode tmp; 698 struct GNUNET_HashCode tmp;
@@ -263,45 +708,50 @@ compute_global_id (struct GNUNET_HashCode *dst,
263} 708}
264 709
265 710
711/**
712 * Function called to notify a client about the connection
713 * begin ready to queue more data. "buf" will be
714 * NULL and "size" zero if the connection was closed for
715 * writing in the meantime.
716 *
717 * @param cls consensus session
718 * @param size number of bytes available in buf
719 * @param buf where the callee should write the message
720 * @return number of bytes written to buf
721 */
266static size_t 722static size_t
267transmit_pending (void *cls, size_t size, void *buf) 723transmit_queued (void *cls, size_t size,
724 void *buf)
268{ 725{
269 struct GNUNET_CONSENSUS_Element *element;
270 struct GNUNET_CONSENSUS_ElementMessage *msg;
271 struct ConsensusSession *session; 726 struct ConsensusSession *session;
727 struct QueuedMessage *qmsg;
728 size_t msg_size;
272 729
273 session = (struct ConsensusSession *) cls; 730 session = (struct ConsensusSession *) cls;
274 msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
275 element = session->transmit_pending_head->element;
276
277 GNUNET_assert (NULL != element);
278
279 session->th = NULL; 731 session->th = NULL;
280 732
281 msg->element_type = element->type;
282 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
283 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size);
284 memcpy (&msg[1], element->data, element->size);
285 733
286 session->transmit_pending_head = session->transmit_pending_head->next; 734 qmsg = session->client_messages_head;
735 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
736 GNUNET_assert (qmsg);
287 737
288 send_next (session); 738 if (NULL == buf)
739 {
740 destroy_session (session);
741 return 0;
742 }
289 743
290 return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size; 744 msg_size = ntohs (qmsg->msg->size);
291}
292 745
746 GNUNET_assert (size >= msg_size);
293 747
294static size_t 748 memcpy (buf, qmsg->msg, msg_size);
295transmit_conclude_done (void *cls, size_t size, void *buf) 749 GNUNET_free (qmsg->msg);
296{ 750 GNUNET_free (qmsg);
297 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
298 751
299 msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf; 752 send_next (session);
300 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
301 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage));
302 msg->num_peers = htons (0);
303 753
304 return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage); 754 return msg_size;
305} 755}
306 756
307 757
@@ -313,65 +763,253 @@ transmit_conclude_done (void *cls, size_t size, void *buf)
313static void 763static void
314send_next (struct ConsensusSession *session) 764send_next (struct ConsensusSession *session)
315{ 765{
316 int msize;
317 766
318 GNUNET_assert (NULL != session); 767 GNUNET_assert (NULL != session);
319 768
320 if (NULL != session->th) 769 if (NULL != session->th)
321 {
322 return; 770 return;
323 }
324 771
325 if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO)) 772 if (NULL != session->client_messages_head)
326 { 773 {
327 /* FIXME */ 774 int msize;
328 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); 775 msize = ntohs (session->client_messages_head->msg->size);
329 session->th = 776 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
330 GNUNET_SERVER_notify_transmit_ready (session->client, msize, 777 session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize,
331 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, session); 778 GNUNET_TIME_UNIT_FOREVER_REL,
332 session->conclude_sent = GNUNET_YES; 779 &transmit_queued, session);
333 } 780 }
334 else if (NULL != session->transmit_pending_head) 781}
782
783
784/**
785 * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
786 * the correct signature to be used with e.g. qsort.
787 * We use this function instead.
788 *
789 * @param h1 some hash code
790 * @param h2 some hash code
791 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
792 */
793static int
794hash_cmp (const void *a, const void *b)
795{
796 return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b);
797}
798
799
800/**
801 * Search peer in the list of peers in session.
802 *
803 * @param peer peer to find
804 * @param session session with peer
805 * @return index of peer, -1 if peer is not in session
806 */
807static int
808get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
809{
810 const struct GNUNET_PeerIdentity *needle;
811 needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
812 if (NULL == needle)
813 return -1;
814 return needle - session->peers;
815}
816
817
818
819static void
820hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
821{
822 struct ConsensusPeerInformation *cpi;
823
824 cpi = (struct ConsensusPeerInformation *) cls;
825 cpi->hello = GNUNET_YES;
826
827 GNUNET_assert (GNUNET_STREAM_OK == status);
828
829 cpi = (struct ConsensusPeerInformation *) cls;
830
831 if (cpi->session->conclude_requested)
335 { 832 {
336 msize = session->transmit_pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage); 833 write_strata (cpi, GNUNET_STREAM_OK, 0);
337 session->th =
338 GNUNET_SERVER_notify_transmit_ready (session->client, msize,
339 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, session);
340 /* TODO: insert into ack pending */
341 } 834 }
342} 835}
343 836
344 837
345/** 838/**
346 * Method called whenever a peer has disconnected from the tunnel. 839 * Functions of this type will be called when a stream is established
347 * Implementations of this callback must NOT call
348 * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those
349 * to run in some other task later. However, calling
350 * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed.
351 * 840 *
352 * @param cls closure 841 * @param cls the closure from GNUNET_STREAM_open
353 * @param peer peer identity the tunnel stopped working with 842 * @param socket socket to use to communicate with the other side (read/write)
354 */ 843 */
355static void 844static void
356disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer) 845open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
846{
847 struct ConsensusPeerInformation *cpi;
848 struct ConsensusHello *hello;
849
850
851 cpi = (struct ConsensusPeerInformation *) cls;
852 cpi->is_connected = GNUNET_YES;
853
854 hello = GNUNET_malloc (sizeof *hello);
855 hello->header.size = htons (sizeof *hello);
856 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
857 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
858
859
860 cpi->wh =
861 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
862
863}
864
865
866static void
867initialize_session_info (struct ConsensusSession *session)
868{
869 int i;
870 int last;
871
872 for (i = 0; i < session->num_peers; ++i)
873 {
874 /* initialize back-references, so consensus peer information can
875 * be used as closure */
876 session->info[i].session = session;
877
878 }
879
880 last = (session->local_peer_idx + (session->num_peers / 2)) % session->num_peers;
881 i = (session->local_peer_idx + 1) % session->num_peers;
882 while (i != last)
883 {
884 session->info[i].is_outgoing = GNUNET_YES;
885 session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
886 open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
887 session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, session);
888 i = (i + 1) % session->num_peers;
889 }
890 // tie-breaker for even number of peers
891 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
892 {
893 session->info[last].is_outgoing = GNUNET_YES;
894 session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
895 open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
896 }
897}
898
899
900/**
901 * Create the sorted list of peers for the session,
902 * add the local peer if not in the join message.
903 */
904static void
905initialize_session_peer_list (struct ConsensusSession *session)
906{
907 int local_peer_in_list;
908 int listed_peers;
909 const struct GNUNET_PeerIdentity *msg_peers;
910 unsigned int i;
911
912 GNUNET_assert (NULL != session->join_msg);
913
914 /* peers in the join message, may or may not include the local peer */
915 listed_peers = ntohs (session->join_msg->num_peers);
916
917 session->num_peers = listed_peers;
918
919 msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
920
921 local_peer_in_list = GNUNET_NO;
922 for (i = 0; i < listed_peers; i++)
923 {
924 if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
925 {
926 local_peer_in_list = GNUNET_YES;
927 break;
928 }
929 }
930
931 if (GNUNET_NO == local_peer_in_list)
932 session->num_peers++;
933
934 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
935
936 if (GNUNET_NO == local_peer_in_list)
937 session->peers[session->num_peers - 1] = *my_peer;
938
939 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
940 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
941}
942
943
944static void
945strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key)
357{ 946{
358 /* FIXME: how do we handle this */ 947 uint32_t v;
948 int i;
949 v = key->bits[0];
950 /* count trailing '1'-bits of v */
951 for (i = 0; v & 1; v>>=1, i++);
952
953 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i);
954
955 ibf_insert (strata[i], key);
359} 956}
360 957
361 958
362/** 959/**
363 * Method called whenever a peer has connected to the tunnel. 960 * Initialize the session, continue receiving messages from the owning client
364 * 961 *
365 * @param cls closure 962 * @param session the session to initialize
366 * @param peer peer identity the tunnel was created to, NULL on timeout
367 * @param atsi performance data for the connection
368 */ 963 */
369static void 964static void
370connect_handler (void *cls, 965initialize_session (struct ConsensusSession *session)
371 const struct GNUNET_PeerIdentity *peer,
372 const struct GNUNET_ATS_Information *atsi)
373{ 966{
374 /* not much we can do here, now we know the other peer has been added to our broadcast tunnel */ 967 const struct ConsensusSession *other_session;
968 int i;
969
970 GNUNET_assert (NULL != session->join_msg);
971
972 initialize_session_peer_list (session);
973
974 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
975
976 compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
977
978 /* Check if some local client already owns the session. */
979 other_session = sessions_head;
980 while (NULL != other_session)
981 {
982 if ((other_session != session) &&
983 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
984 {
985 /* session already owned by another client */
986 GNUNET_break (0);
987 disconnect_client (session->client);
988 return;
989 }
990 other_session = other_session->next;
991 }
992
993 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
994
995 session->local_peer_idx = get_peer_idx (my_peer, session);
996 GNUNET_assert (-1 != session->local_peer_idx);
997
998 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
999
1000 session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
1001 for (i = 0; i < STRATA_COUNT; i++)
1002 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1003
1004 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1005
1006 initialize_session_info (session);
1007
1008 GNUNET_free (session->join_msg);
1009 session->join_msg = NULL;
1010
1011 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1012 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
375} 1013}
376 1014
377 1015
@@ -387,88 +1025,49 @@ client_join (void *cls,
387 struct GNUNET_SERVER_Client *client, 1025 struct GNUNET_SERVER_Client *client,
388 const struct GNUNET_MessageHeader *m) 1026 const struct GNUNET_MessageHeader *m)
389{ 1027{
390 struct GNUNET_HashCode global_id;
391 const struct GNUNET_CONSENSUS_JoinMessage *msg;
392 struct ConsensusSession *session; 1028 struct ConsensusSession *session;
393 unsigned int i;
394
395 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
396
397 msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
398
399 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s (&msg->session_id));
400 1029
401 compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers); 1030 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join received\n");
402
403 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s (&global_id));
404 1031
1032 // make sure the client has not already joined a session
405 session = sessions_head; 1033 session = sessions_head;
406 while (NULL != session) 1034 while (NULL != session)
407 { 1035 {
408 if (client == session->client) 1036 if (session->client == client)
409 {
410
411 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
412 disconnect_client (client);
413 return;
414 }
415 if (0 == memcmp (session->global_id, &global_id, sizeof (struct GNUNET_HashCode)))
416 { 1037 {
417 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another client\n"); 1038 GNUNET_break (0);
418 disconnect_client (client); 1039 disconnect_client (client);
419 return; 1040 return;
420 } 1041 }
421 session = session->next; 1042 session = session->next;
422 } 1043 }
423 1044
424 GNUNET_SERVER_client_keep (client);
425
426 /* session does not exist yet, create it */
427 session = GNUNET_malloc (sizeof (struct ConsensusSession)); 1045 session = GNUNET_malloc (sizeof (struct ConsensusSession));
428 session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode)); 1046 session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
429 session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode));
430 session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
431 session->client = client; 1047 session->client = client;
432 /* FIXME: should not be a constant, but chosen adaptively */ 1048 GNUNET_SERVER_client_keep (client);
433 session->round_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
434
435 session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session, connect_handler, disconnect_handler, session);
436
437 session->num_peers = 0;
438
439 /* count the peers that are not the local peer */
440 for (i = 0; i < msg->num_peers; i++)
441 {
442 struct GNUNET_PeerIdentity *peers;
443 peers = (struct GNUNET_PeerIdentity *) &msg[1];
444 if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
445 session->num_peers++;
446 }
447 1049
448 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeer)); 1050 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
449 1051
450 /* copy the peer identities and add peers to broadcast tunnel */ 1052 // Initialize session later if local peer identity is not known yet.
451 for (i = 0; i < msg->num_peers; i++) 1053 if (NULL == my_peer)
452 { 1054 {
453 struct GNUNET_PeerIdentity *peers; 1055 GNUNET_SERVER_disable_receive_done_warning (client);
454 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 1056 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n");
455 if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) 1057 return;
456 {
457 *session->peers->peer_id = peers[i];
458 GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel, &peers[i]);
459 }
460 } 1058 }
461 1059
462 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n"); 1060 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n");
463 1061 initialize_session (session);
464 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
465
466 GNUNET_SERVER_receive_done (client, GNUNET_OK);
467} 1062}
468 1063
469 1064
470/** 1065/**
471 * Called when a client performs an insert operation. 1066 * Called when a client performs an insert operation.
1067 *
1068 * @param cls (unused)
1069 * @param client client handle
1070 * @param message message sent by the client
472 */ 1071 */
473void 1072void
474client_insert (void *cls, 1073client_insert (void *cls,
@@ -510,7 +1109,9 @@ client_insert (void *cls,
510 GNUNET_CRYPTO_hash (element, element_size, &key); 1109 GNUNET_CRYPTO_hash (element, element_size, &key);
511 1110
512 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, 1111 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
513 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 1112 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1113
1114 strata_insert (session->strata, &key);
514 1115
515 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1116 GNUNET_SERVER_receive_done (client, GNUNET_OK);
516 1117
@@ -518,31 +1119,145 @@ client_insert (void *cls,
518} 1119}
519 1120
520 1121
1122
1123/**
1124 * Functions of this signature are called whenever writing operations
1125 * on a stream are executed
1126 *
1127 * @param cls the closure from GNUNET_STREAM_write
1128 * @param status the status of the stream at the time this function is called;
1129 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1130 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1131 * (this doesn't mean that the data is never sent, the receiver may
1132 * have read the data but its ACKs may have been lost);
1133 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1134 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1135 * be processed.
1136 * @param size the number of bytes written
1137 */
1138static void
1139write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1140{
1141 struct ConsensusPeerInformation *cpi;
1142 struct StrataMessage *strata_msg;
1143 size_t msize;
1144 int i;
1145 struct GNUNET_HashCode *hash_dst;
1146 uint8_t *count_dst;
1147 int num_strata;
1148
1149 cpi = (struct ConsensusPeerInformation *) cls;
1150
1151 GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1152
1153 /* FIXME: handle this */
1154 GNUNET_assert (GNUNET_STREAM_OK == status);
1155
1156 if (STRATA_COUNT == cpi->strata_counter)
1157 {
1158 /* strata have been written, wait for other side's IBF */
1159 return;
1160 }
1161
1162 if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE)
1163 num_strata = (STRATA_COUNT - cpi->strata_counter);
1164 else
1165 num_strata = STRATA_PER_MESSAGE;
1166
1167
1168 msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1169
1170 strata_msg = GNUNET_malloc (msize);
1171 strata_msg->header.size = htons (msize);
1172 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1173 strata_msg->num_strata = htons (num_strata);
1174
1175 /* for correct message alignment, copy bucket types seperately */
1176 hash_dst = (struct GNUNET_HashCode *) &strata_msg[1];
1177
1178 for (i = 0; i < num_strata; i++)
1179 {
1180 memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1181 hash_dst += STRATA_IBF_BUCKETS;
1182 }
1183
1184 for (i = 0; i < num_strata; i++)
1185 {
1186 memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1187 hash_dst += STRATA_IBF_BUCKETS;
1188 }
1189
1190 count_dst = (uint8_t *) hash_dst;
1191
1192 for (i = 0; i < num_strata; i++)
1193 {
1194 memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, STRATA_IBF_BUCKETS);
1195 count_dst += STRATA_IBF_BUCKETS;
1196 }
1197
1198 cpi->strata_counter += num_strata;
1199
1200 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1201 write_strata, cpi);
1202
1203 GNUNET_assert (NULL != cpi->wh);
1204}
1205
1206
521/** 1207/**
522 * Do one round of the conclusion. 1208 * Functions of this signature are called whenever writing operations
523 * Start by broadcasting the set difference estimator (IBF strata). 1209 * on a stream are executed
524 * 1210 *
1211 * @param cls the closure from GNUNET_STREAM_write
1212 * @param status the status of the stream at the time this function is called;
1213 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1214 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1215 * (this doesn't mean that the data is never sent, the receiver may
1216 * have read the data but its ACKs may have been lost);
1217 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1218 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1219 * be processed.
1220 * @param size the number of bytes written
525 */ 1221 */
526void 1222static void
527conclude_do_round (struct ConsensusSession *session) 1223write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
528{ 1224{
529 /* FIXME */ 1225 struct ConsensusPeerInformation *cpi;
1226
1227 cpi = (struct ConsensusPeerInformation *) cls;
530} 1228}
531 1229
532 1230
533/** 1231/**
534 * Cancel the current round if necessary, decide to run another round or 1232 * Functions of this signature are called whenever writing operations
535 * terminate. 1233 * on a stream are executed
1234 *
1235 * @param cls the closure from GNUNET_STREAM_write
1236 * @param status the status of the stream at the time this function is called;
1237 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1238 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1239 * (this doesn't mean that the data is never sent, the receiver may
1240 * have read the data but its ACKs may have been lost);
1241 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1242 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1243 * be processed.
1244 * @param size the number of bytes written
536 */ 1245 */
537void 1246static void
538conclude_round_done (struct ConsensusSession *session) 1247write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
539{ 1248{
540 /* FIXME */ 1249 struct ConsensusPeerInformation *cpi;
1250
1251 cpi = (struct ConsensusPeerInformation *) cls;
541} 1252}
542 1253
543 1254
544/** 1255/**
545 * Called when a client performs the conclude operation. 1256 * Called when a client performs the conclude operation.
1257 *
1258 * @param cls (unused)
1259 * @param client client handle
1260 * @param message message sent by the client
546 */ 1261 */
547void 1262void
548client_conclude (void *cls, 1263client_conclude (void *cls,
@@ -550,40 +1265,55 @@ client_conclude (void *cls,
550 const struct GNUNET_MessageHeader *message) 1265 const struct GNUNET_MessageHeader *message)
551{ 1266{
552 struct ConsensusSession *session; 1267 struct ConsensusSession *session;
1268 int i;
553 1269
554 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); 1270 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
555 1271
556 session = sessions_head; 1272 session = sessions_head;
557 while ((session != NULL) && (session->client != client)) 1273 while ((session != NULL) && (session->client != client))
558 {
559 session = session->next; 1274 session = session->next;
560 }
561 if (NULL == session) 1275 if (NULL == session)
562 { 1276 {
563 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n"); 1277 /* client not found */
1278 GNUNET_break (0);
564 GNUNET_SERVER_client_disconnect (client); 1279 GNUNET_SERVER_client_disconnect (client);
565 return; 1280 return;
566 } 1281 }
567 1282
568 if (GNUNET_YES == session->conclude_requested) 1283 if (GNUNET_YES == session->conclude_requested)
569 { 1284 {
570 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude twice\n"); 1285 /* client requested conclude twice */
571 GNUNET_SERVER_client_disconnect (client); 1286 GNUNET_break (0);
1287 disconnect_client (client);
572 return; 1288 return;
573 } 1289 }
574 1290
575 session->conclude_requested = GNUNET_YES; 1291 session->conclude_requested = GNUNET_YES;
576 1292
577 conclude_do_round (session); 1293 /* FIXME: write to already connected sockets */
578 1294
579 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1295 for (i = 0; i < session->num_peers; i++)
1296 {
1297 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
1298 (GNUNET_YES == session->info[i].hello) )
1299 {
1300 /* kick off transmitting strata by calling the write continuation */
1301 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1302 }
1303 }
1304
580 1305
1306 GNUNET_SERVER_receive_done (client, GNUNET_OK);
581 send_next (session); 1307 send_next (session);
582} 1308}
583 1309
584 1310
585/** 1311/**
586 * Called when a client sends an ack 1312 * Called when a client sends an ack
1313 *
1314 * @param cls (unused)
1315 * @param client client handle
1316 * @param message message sent by the client
587 */ 1317 */
588void 1318void
589client_ack (void *cls, 1319client_ack (void *cls,
@@ -614,71 +1344,23 @@ core_startup (void *cls,
614 struct GNUNET_CORE_Handle *core, 1344 struct GNUNET_CORE_Handle *core,
615 const struct GNUNET_PeerIdentity *peer) 1345 const struct GNUNET_PeerIdentity *peer)
616{ 1346{
617 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 1347 struct ConsensusSession *session;
618 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
619 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
620 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
621 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
622 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
623 sizeof (struct GNUNET_CONSENSUS_AckMessage)},
624 {NULL, NULL, 0, 0}
625 };
626 1348
627 GNUNET_SERVER_add_handlers (srv, handlers);
628 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); 1349 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
629 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ 1350 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
630 GNUNET_SCHEDULER_add_now (&disconnect_core, core); 1351 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
631 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); 1352 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
632}
633
634
635
636/**
637 * Method called whenever another peer has added us to a tunnel
638 * the other peer initiated.
639 * Only called (once) upon reception of data with a message type which was
640 * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
641 * causes te tunnel to be ignored and no further notifications are sent about
642 * the same tunnel.
643 *
644 * @param cls closure
645 * @param tunnel new handle to the tunnel
646 * @param initiator peer that started the tunnel
647 * @param atsi performance information for the tunnel
648 * @return initial tunnel context for the tunnel
649 * (can be NULL -- that's not an error)
650 */
651static void *
652new_tunnel (void *cls,
653 struct GNUNET_MESH_Tunnel *tunnel,
654 const struct GNUNET_PeerIdentity *initiator,
655 const struct GNUNET_ATS_Information *atsi)
656{
657 /* there's nothing we can do here, as we don't have the global consensus id yet */
658 return NULL;
659}
660 1353
661 1354 session = sessions_head;
662/** 1355 while (NULL != session)
663 * Function called whenever an inbound tunnel is destroyed. Should clean up 1356 {
664 * any associated state. This function is NOT called if the client has 1357 if (NULL != session->join_msg)
665 * explicitly asked for the tunnel to be destroyed using 1358 initialize_session (session);
666 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on 1359 session = session->next;
667 * the tunnel. 1360 }
668 *
669 * @param cls closure (set from GNUNET_MESH_connect)
670 * @param tunnel connection to the other end (henceforth invalid)
671 * @param tunnel_ctx place where local state associated
672 * with the tunnel is stored
673 */
674static void
675cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
676{
677 /* FIXME: what to do here? */
678} 1361}
679 1362
680 1363
681
682/** 1364/**
683 * Called to clean up, after a shutdown has been requested. 1365 * Called to clean up, after a shutdown has been requested.
684 * 1366 *
@@ -689,105 +1371,27 @@ static void
689shutdown_task (void *cls, 1371shutdown_task (void *cls,
690 const struct GNUNET_SCHEDULER_TaskContext *tc) 1372 const struct GNUNET_SCHEDULER_TaskContext *tc)
691{ 1373{
692 /* mesh requires all the tunnels to be destroyed manually */
693 while (NULL != sessions_head) 1374 while (NULL != sessions_head)
694 { 1375 {
695 struct ConsensusSession *session; 1376 struct ConsensusSession *session;
696 session = sessions_head; 1377 session = sessions_head;
697 GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel);
698 sessions_head = sessions_head->next; 1378 sessions_head = sessions_head->next;
699 GNUNET_free (session); 1379 GNUNET_free (session);
700 } 1380 }
701 1381
702 if (NULL != mesh)
703 {
704 GNUNET_MESH_disconnect (mesh);
705 mesh = NULL;
706 }
707 if (NULL != core) 1382 if (NULL != core)
708 { 1383 {
709 GNUNET_CORE_disconnect (core); 1384 GNUNET_CORE_disconnect (core);
710 core = NULL; 1385 core = NULL;
711 } 1386 }
712}
713
714
715
716/**
717 * Functions with this signature are called whenever a message is
718 * received.
719 *
720 * @param cls closure (set from GNUNET_MESH_connect)
721 * @param tunnel connection to the other end
722 * @param tunnel_ctx place to store local state associated with the tunnel
723 * @param sender who sent the message
724 * @param message the actual message
725 * @param atsi performance data for the connection
726 * @return GNUNET_OK to keep the connection open,
727 * GNUNET_SYSERR to close it (signal serious error)
728 */
729static int
730p2p_delta_estimate (void *cls,
731 struct GNUNET_MESH_Tunnel * tunnel,
732 void **tunnel_ctx,
733 const struct GNUNET_PeerIdentity *sender,
734 const struct GNUNET_MessageHeader *message,
735 const struct GNUNET_ATS_Information *atsi)
736{
737 /* FIXME */
738 return GNUNET_OK;
739}
740
741
742/**
743 * Functions with this signature are called whenever a message is
744 * received.
745 *
746 * @param cls closure (set from GNUNET_MESH_connect)
747 * @param tunnel connection to the other end
748 * @param tunnel_ctx place to store local state associated with the tunnel
749 * @param sender who sent the message
750 * @param message the actual message
751 * @param atsi performance data for the connection
752 * @return GNUNET_OK to keep the connection open,
753 * GNUNET_SYSERR to close it (signal serious error)
754 */
755static int
756p2p_difference_digest (void *cls,
757 struct GNUNET_MESH_Tunnel * tunnel,
758 void **tunnel_ctx,
759 const struct GNUNET_PeerIdentity *sender,
760 const struct GNUNET_MessageHeader *message,
761 const struct GNUNET_ATS_Information *atsi)
762{
763 /* FIXME */
764 return GNUNET_OK;
765}
766 1387
1388 if (NULL != listener)
1389 {
1390 GNUNET_STREAM_listen_close (listener);
1391 listener = NULL;
1392 }
767 1393
768/** 1394 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
769 * Functions with this signature are called whenever a message is
770 * received.
771 *
772 * @param cls closure (set from GNUNET_MESH_connect)
773 * @param tunnel connection to the other end
774 * @param tunnel_ctx place to store local state associated with the tunnel
775 * @param sender who sent the message
776 * @param message the actual message
777 * @param atsi performance data for the connection
778 * @return GNUNET_OK to keep the connection open,
779 * GNUNET_SYSERR to close it (signal serious error)
780 */
781static int
782p2p_elements_and_requests (void *cls,
783 struct GNUNET_MESH_Tunnel * tunnel,
784 void **tunnel_ctx,
785 const struct GNUNET_PeerIdentity *sender,
786 const struct GNUNET_MessageHeader *message,
787 const struct GNUNET_ATS_Information *atsi)
788{
789 /* FIXME */
790 return GNUNET_OK;
791} 1395}
792 1396
793 1397
@@ -801,33 +1405,38 @@ p2p_elements_and_requests (void *cls,
801static void 1405static void
802run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) 1406run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
803{ 1407{
804 static const struct GNUNET_CORE_MessageHandler handlers[] = { 1408 static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
805 {NULL, 0, 0} 1409 {NULL, 0, 0}
806 }; 1410 };
807 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { 1411 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
808 {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0}, 1412 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
809 {p2p_difference_digest, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0}, 1413 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
810 {p2p_elements_and_requests, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0}, 1414 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
811 {NULL, 0, 0} 1415 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
812 }; 1416 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
813 static const GNUNET_MESH_ApplicationType app_types[] = { 1417 sizeof (struct GNUNET_CONSENSUS_AckMessage)},
814 GNUNET_APPLICATION_TYPE_CONSENSUS, 1418 {NULL, NULL, 0, 0}
815 GNUNET_APPLICATION_TYPE_END
816 }; 1419 };
817 1420
818 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
819
820 cfg = c; 1421 cfg = c;
821 srv = server; 1422 srv = server;
822 1423
1424 GNUNET_SERVER_add_handlers (server, server_handlers);
1425
823 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); 1426 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
824 1427
825 mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers, app_types); 1428
826 GNUNET_assert (NULL != mesh); 1429 listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
1430 listen_cb, NULL,
1431 GNUNET_STREAM_OPTION_END);
1432
827 1433
828 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ 1434 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
829 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers); 1435 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
830 GNUNET_assert (NULL != core); 1436 GNUNET_assert (NULL != core);
1437
1438 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1439 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", STRATA_PER_MESSAGE);
831} 1440}
832 1441
833 1442
@@ -843,6 +1452,7 @@ main (int argc, char *const *argv)
843{ 1452{
844 int ret; 1453 int ret;
845 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); 1454 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1455 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
846 return (GNUNET_OK == ret) ? 0 : 1; 1456 return (GNUNET_OK == ret) ? 0 : 1;
847} 1457}
848 1458
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c
index 4f1aca939..2d06fc29b 100644
--- a/src/consensus/ibf.c
+++ b/src/consensus/ibf.c
@@ -25,50 +25,8 @@
25 * @author Florian Dold 25 * @author Florian Dold
26 */ 26 */
27 27
28#include "ibf.h"
29
30
31/**
32 * Opaque handle to an invertible bloom filter (IBF).
33 *
34 * An IBF is a counting bloom filter that has the ability to restore
35 * the hashes of its stored elements with high probability.
36 */
37struct InvertibleBloomFilter
38{
39 /**
40 * How many cells does this IBF have?
41 */
42 unsigned int size;
43 28
44 /** 29#include "ibf.h"
45 * In how many cells do we hash one element?
46 * Usually 4 or 3.
47 */
48 unsigned int hash_num;
49
50 /**
51 * Salt for mingling hashes
52 */
53 uint32_t salt;
54
55 /**
56 * How many times has a bucket been hit?
57 * Can be negative, as a result of IBF subtraction.
58 */
59 int8_t *count;
60
61 /**
62 * xor sums of the elements' hash codes, used to identify the elements.
63 */
64 struct GNUNET_HashCode *id_sum;
65
66 /**
67 * xor sums of the "hash of the hash".
68 */
69 struct GNUNET_HashCode *hash_sum;
70
71};
72 30
73 31
74/** 32/**
@@ -152,6 +110,8 @@ ibf_insert_on_side (struct InvertibleBloomFilter *ibf,
152 used_buckets[i] = bucket; 110 used_buckets[i] = bucket;
153 111
154 ibf->count[bucket] += side; 112 ibf->count[bucket] += side;
113
114 GNUNET_log_from(GNUNET_ERROR_TYPE_INFO, "ibf", "inserting in bucket %d \n", bucket);
155 115
156 GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket], 116 GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket],
157 &ibf->id_sum[bucket]); 117 &ibf->id_sum[bucket]);
@@ -214,8 +174,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
214 int i; 174 int i;
215 175
216 GNUNET_assert (NULL != ibf); 176 GNUNET_assert (NULL != ibf);
217 GNUNET_assert (NULL != ret_id);
218 GNUNET_assert (NULL != ret_side);
219 177
220 for (i = 0; i < ibf->size; i++) 178 for (i = 0; i < ibf->size; i++)
221 { 179 {
@@ -227,8 +185,10 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
227 if (0 != memcmp (&hash, &ibf->hash_sum[i], sizeof (struct GNUNET_HashCode))) 185 if (0 != memcmp (&hash, &ibf->hash_sum[i], sizeof (struct GNUNET_HashCode)))
228 continue; 186 continue;
229 187
230 *ret_side = ibf->count[i]; 188 if (NULL != ret_side)
231 *ret_id = ibf->id_sum[i]; 189 *ret_side = ibf->count[i];
190 if (NULL != ret_id)
191 *ret_id = ibf->id_sum[i];
232 192
233 /* insert on the opposite side, effectively removing the element */ 193 /* insert on the opposite side, effectively removing the element */
234 ibf_insert_on_side (ibf, &ibf->id_sum[i], -ibf->count[i]); 194 ibf_insert_on_side (ibf, &ibf->id_sum[i], -ibf->count[i]);
@@ -269,3 +229,36 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *
269 } 229 }
270} 230}
271 231
232/**
233 * Create a copy of an IBF, the copy has to be destroyed properly.
234 *
235 * @param ibf the IBF to copy
236 */
237struct InvertibleBloomFilter *
238ibf_dup (struct InvertibleBloomFilter *ibf)
239{
240 struct InvertibleBloomFilter *copy;
241 copy = GNUNET_malloc (sizeof *copy);
242 copy->hash_num = ibf->hash_num;
243 copy->salt = ibf->salt;
244 copy->size = ibf->size;
245 copy->hash_sum = GNUNET_memdup (ibf->hash_sum, ibf->size * sizeof (struct GNUNET_HashCode));
246 copy->id_sum = GNUNET_memdup (ibf->id_sum, ibf->size * sizeof (struct GNUNET_HashCode));
247 copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (uint8_t));
248 return copy;
249}
250
251/**
252 * Destroy all resources associated with the invertible bloom filter.
253 * No more ibf_*-functions may be called on ibf after calling destroy.
254 *
255 * @param ibf the intertible bloom filter to destroy
256 */
257void
258ibf_destroy (struct InvertibleBloomFilter *ibf)
259{
260 GNUNET_free (ibf->hash_sum);
261 GNUNET_free (ibf->id_sum);
262 GNUNET_free (ibf->count);
263 GNUNET_free (ibf);
264}
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h
index e43d7c5fe..26f101905 100644
--- a/src/consensus/ibf.h
+++ b/src/consensus/ibf.h
@@ -39,14 +39,52 @@ extern "C"
39#endif 39#endif
40#endif 40#endif
41 41
42/**
43 * Size of one ibf bucket in bytes
44 */
45#define IBF_BUCKET_SIZE (64+64+1)
46
42 47
43/** 48/**
44 * Opaque handle to an invertible bloom filter (IBF). 49 * Invertible bloom filter (IBF).
45 * 50 *
46 * An IBF is a counting bloom filter that has the ability to restore 51 * An IBF is a counting bloom filter that has the ability to restore
47 * the hashes of its stored elements with high probability. 52 * the hashes of its stored elements with high probability.
48 */ 53 */
49struct InvertibleBloomFilter; 54struct InvertibleBloomFilter
55{
56 /**
57 * How many cells does this IBF have?
58 */
59 unsigned int size;
60
61 /**
62 * In how many cells do we hash one element?
63 * Usually 4 or 3.
64 */
65 unsigned int hash_num;
66
67 /**
68 * Salt for mingling hashes
69 */
70 uint32_t salt;
71
72 /**
73 * xor sums of the elements' hash codes, used to identify the elements.
74 */
75 struct GNUNET_HashCode *id_sum;
76
77 /**
78 * xor sums of the "hash of the hash".
79 */
80 struct GNUNET_HashCode *hash_sum;
81
82 /**
83 * How many times has a bucket been hit?
84 * Can be negative, as a result of IBF subtraction.
85 */
86 int8_t *count;
87};
50 88
51 89
52/** 90/**
@@ -106,15 +144,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct GNUNET_HashCode
106struct InvertibleBloomFilter * 144struct InvertibleBloomFilter *
107ibf_dup (struct InvertibleBloomFilter *ibf); 145ibf_dup (struct InvertibleBloomFilter *ibf);
108 146
109
110/*
111ibf_hton ();
112
113ibf_ntoh ();
114
115ibf_get_nbo_size ();
116*/
117
118/** 147/**
119 * Destroy all resources associated with the invertible bloom filter. 148 * Destroy all resources associated with the invertible bloom filter.
120 * No more ibf_*-functions may be called on ibf after calling destroy. 149 * No more ibf_*-functions may be called on ibf after calling destroy.
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 86dfadb9b..01266c2a9 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -1,6 +1,6 @@
1[consensus] 1[consensus]
2AUTOSTART = YES 2AUTOSTART = YES
3PORT = 2103 3PORT = 2110
4HOSTNAME = localhost 4HOSTNAME = localhost
5HOME = $SERVICEHOME 5HOME = $SERVICEHOME
6BINARY = gnunet-service-consensus 6BINARY = gnunet-service-consensus
diff --git a/src/consensus/test_consensus_api.c b/src/consensus/test_consensus_api.c
index e752983c2..c79a6221c 100644
--- a/src/consensus/test_consensus_api.c
+++ b/src/consensus/test_consensus_api.c
@@ -17,6 +17,7 @@
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19*/ 19*/
20
20/** 21/**
21 * @file consensus/test_consensus_api.c 22 * @file consensus/test_consensus_api.c
22 * @brief testcase for consensus_api.c 23 * @brief testcase for consensus_api.c
@@ -29,18 +30,20 @@
29 30
30static struct GNUNET_CONSENSUS_Handle *consensus; 31static struct GNUNET_CONSENSUS_Handle *consensus;
31 32
32static int insert;
33
34static struct GNUNET_HashCode session_id; 33static struct GNUNET_HashCode session_id;
35 34
36 35
37static void conclude_done (void *cls, 36static int
38 unsigned int num_peers_in_consensus, 37conclude_done (void *cls, const struct GNUNET_CONSENSUS_Group *group)
39 const struct GNUNET_PeerIdentity *peers_in_consensus)
40{ 38{
41 struct GNUNET_CONSENSUS_Handle *consensus; 39 if (NULL == group)
42 consensus = (struct GNUNET_CONSENSUS_Handle *) cls; 40 {
43 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n"); 41 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude over\n");
42 GNUNET_SCHEDULER_shutdown ();
43 return GNUNET_NO;
44 }
45 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "concluded\n");
46 return GNUNET_YES;
44} 47}
45 48
46static int 49static int
@@ -54,7 +57,30 @@ on_new_element (void *cls,
54static void 57static void
55insert_done (void *cls, int success) 58insert_done (void *cls, int success)
56{ 59{
60 /* make sure cb is only called once */
61 static int called = GNUNET_NO;
62 GNUNET_assert (GNUNET_NO == called);
63 called = GNUNET_YES;
57 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "insert done\n"); 64 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "insert done\n");
65 GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_SECONDS, 0, &conclude_done, NULL);
66}
67
68
69/**
70 * Signature of the main function of a task.
71 *
72 * @param cls closure
73 * @param tc context information (why was this task triggered now)
74 */
75static void
76on_shutdown (void *cls,
77 const struct GNUNET_SCHEDULER_TaskContext *tc)
78{
79 if (NULL != consensus)
80 {
81 GNUNET_CONSENSUS_destroy (consensus);
82 consensus = NULL;
83 }
58} 84}
59 85
60 86
@@ -69,18 +95,19 @@ run (void *cls,
69 struct GNUNET_CONSENSUS_Element el2 = {"bar", 4, 0}; 95 struct GNUNET_CONSENSUS_Element el2 = {"bar", 4, 0};
70 96
71 GNUNET_log_setup ("test_consensus_api", 97 GNUNET_log_setup ("test_consensus_api",
72 "DEBUG", 98 "INFO",
73 NULL); 99 NULL);
74 100
75 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n"); 101 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n");
76 102
103 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &on_shutdown, NULL);
104
77 GNUNET_CRYPTO_hash (str, strlen (str), &session_id); 105 GNUNET_CRYPTO_hash (str, strlen (str), &session_id);
78 consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus); 106 consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus);
79 GNUNET_assert (consensus != NULL); 107 GNUNET_assert (consensus != NULL);
80 /* 108
81 GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1); 109 GNUNET_CONSENSUS_insert (consensus, &el1, NULL, &consensus);
82 GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2); 110 GNUNET_CONSENSUS_insert (consensus, &el2, &insert_done, &consensus);
83 */
84} 111}
85 112
86 113