diff options
-rw-r--r-- | src/consensus/Makefile.am | 1 | ||||
-rw-r--r-- | src/consensus/consensus.h | 9 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 481 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-start-peers.c | 3 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 21 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 424 | ||||
-rw-r--r-- | src/consensus/test_consensus_api.c | 30 | ||||
-rw-r--r-- | src/include/gnunet_applications.h | 5 | ||||
-rw-r--r-- | src/include/gnunet_consensus_service.h | 50 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 25 |
10 files changed, 590 insertions, 459 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index d6901245f..1beaa0c62 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am | |||
@@ -57,6 +57,7 @@ gnunet_service_consensus_SOURCES = \ | |||
57 | gnunet_service_consensus_LDADD = \ | 57 | gnunet_service_consensus_LDADD = \ |
58 | $(top_builddir)/src/util/libgnunetutil.la \ | 58 | $(top_builddir)/src/util/libgnunetutil.la \ |
59 | $(top_builddir)/src/core/libgnunetcore.la \ | 59 | $(top_builddir)/src/core/libgnunetcore.la \ |
60 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | ||
60 | $(GN_LIBINTL) | 61 | $(GN_LIBINTL) |
61 | 62 | ||
62 | libgnunetconsensus_la_SOURCES = \ | 63 | libgnunetconsensus_la_SOURCES = \ |
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h index d76c6b769..75b90b0f9 100644 --- a/src/consensus/consensus.h +++ b/src/consensus/consensus.h | |||
@@ -52,7 +52,15 @@ struct GNUNET_CONSENSUS_ConcludeMessage | |||
52 | */ | 52 | */ |
53 | struct GNUNET_MessageHeader header; | 53 | struct GNUNET_MessageHeader header; |
54 | 54 | ||
55 | /** | ||
56 | * Timeout for conclude | ||
57 | */ | ||
55 | struct GNUNET_TIME_RelativeNBO timeout; | 58 | struct GNUNET_TIME_RelativeNBO timeout; |
59 | |||
60 | /** | ||
61 | * Minimum group size required for a consensus group. | ||
62 | */ | ||
63 | uint32_t min_group_size; | ||
56 | }; | 64 | }; |
57 | 65 | ||
58 | 66 | ||
@@ -102,6 +110,7 @@ struct GNUNET_CONSENSUS_AckMessage | |||
102 | */ | 110 | */ |
103 | uint8_t keep; | 111 | uint8_t keep; |
104 | 112 | ||
113 | /* FIXME: add message hash? */ | ||
105 | }; | 114 | }; |
106 | 115 | ||
107 | GNUNET_NETWORK_STRUCT_END | 116 | GNUNET_NETWORK_STRUCT_END |
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 2479c019c..25c76b358 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -33,14 +33,43 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) |
35 | 35 | ||
36 | struct ElementAck | 36 | /** |
37 | * Actions that can be queued. | ||
38 | */ | ||
39 | struct QueuedMessage | ||
37 | { | 40 | { |
38 | struct ElementAck *next; | 41 | /** |
39 | struct ElementAck *prev; | 42 | * Queued messages are stored in a doubly linked list. |
40 | int keep; | 43 | */ |
41 | struct GNUNET_CONSENSUS_Element *element; | 44 | struct QueuedMessage *next; |
45 | |||
46 | /** | ||
47 | * Queued messages are stored in a doubly linked list. | ||
48 | */ | ||
49 | struct QueuedMessage *prev; | ||
50 | |||
51 | /** | ||
52 | * The actual queued message. | ||
53 | */ | ||
54 | struct GNUNET_MessageHeader *msg; | ||
55 | |||
56 | /** | ||
57 | * Size of the message in msg. | ||
58 | */ | ||
59 | size_t size; | ||
60 | |||
61 | /** | ||
62 | * Will be called after transmit, if not NULL | ||
63 | */ | ||
64 | GNUNET_CONSENSUS_InsertDoneCallback idc; | ||
65 | |||
66 | /** | ||
67 | * Closure for idc | ||
68 | */ | ||
69 | void *idc_cls; | ||
42 | }; | 70 | }; |
43 | 71 | ||
72 | |||
44 | /** | 73 | /** |
45 | * Handle for the service. | 74 | * Handle for the service. |
46 | */ | 75 | */ |
@@ -52,14 +81,14 @@ struct GNUNET_CONSENSUS_Handle | |||
52 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 81 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
53 | 82 | ||
54 | /** | 83 | /** |
55 | * Socket (if available). | 84 | * Client connected to the consensus service, may be NULL if not connected. |
56 | */ | 85 | */ |
57 | struct GNUNET_CLIENT_Connection *client; | 86 | struct GNUNET_CLIENT_Connection *client; |
58 | 87 | ||
59 | /** | 88 | /** |
60 | * Callback for new elements. Not called for elements added locally. | 89 | * Callback for new elements. Not called for elements added locally. |
61 | */ | 90 | */ |
62 | GNUNET_CONSENSUS_NewElementCallback new_element_cb; | 91 | GNUNET_CONSENSUS_ElementCallback new_element_cb; |
63 | 92 | ||
64 | /** | 93 | /** |
65 | * Closure for new_element_cb | 94 | * Closure for new_element_cb |
@@ -67,7 +96,7 @@ struct GNUNET_CONSENSUS_Handle | |||
67 | void *new_element_cls; | 96 | void *new_element_cls; |
68 | 97 | ||
69 | /** | 98 | /** |
70 | * Session identifier for the consensus session. | 99 | * The (local) session identifier for the consensus session. |
71 | */ | 100 | */ |
72 | struct GNUNET_HashCode session_id; | 101 | struct GNUNET_HashCode session_id; |
73 | 102 | ||
@@ -77,9 +106,9 @@ struct GNUNET_CONSENSUS_Handle | |||
77 | int num_peers; | 106 | int num_peers; |
78 | 107 | ||
79 | /** | 108 | /** |
80 | * Peer identities of peers in the consensus. Optionally includes the local peer. | 109 | * Peer identities of peers participating in the consensus, includes the local peer. |
81 | */ | 110 | */ |
82 | struct GNUNET_PeerIdentity *peers; | 111 | struct GNUNET_PeerIdentity **peers; |
83 | 112 | ||
84 | /** | 113 | /** |
85 | * Currently active transmit request. | 114 | * Currently active transmit request. |
@@ -92,22 +121,11 @@ struct GNUNET_CONSENSUS_Handle | |||
92 | int joined; | 121 | int joined; |
93 | 122 | ||
94 | /** | 123 | /** |
95 | * Called when the current insertion operation finishes. | ||
96 | * NULL if there is no insert operation active. | ||
97 | */ | ||
98 | GNUNET_CONSENSUS_InsertDoneCallback idc; | ||
99 | |||
100 | /** | ||
101 | * Closure for the insert done callback. | 124 | * Closure for the insert done callback. |
102 | */ | 125 | */ |
103 | void *idc_cls; | 126 | void *idc_cls; |
104 | 127 | ||
105 | /** | 128 | /** |
106 | * An element that was requested to be inserted. | ||
107 | */ | ||
108 | struct GNUNET_CONSENSUS_Element *insert_element; | ||
109 | |||
110 | /** | ||
111 | * Called when the conclude operation finishes or fails. | 129 | * Called when the conclude operation finishes or fails. |
112 | */ | 130 | */ |
113 | GNUNET_CONSENSUS_ConcludeCallback conclude_cb; | 131 | GNUNET_CONSENSUS_ConcludeCallback conclude_cb; |
@@ -122,103 +140,92 @@ struct GNUNET_CONSENSUS_Handle | |||
122 | */ | 140 | */ |
123 | struct GNUNET_TIME_Absolute conclude_deadline; | 141 | struct GNUNET_TIME_Absolute conclude_deadline; |
124 | 142 | ||
125 | struct ElementAck *ack_head; | 143 | unsigned int conclude_min_size; |
126 | struct ElementAck *ack_tail; | ||
127 | |||
128 | /** | ||
129 | * Set to GNUNET_YES if the begin message has been transmitted to the service | ||
130 | */ | ||
131 | int begin_sent; | ||
132 | 144 | ||
133 | /** | 145 | struct QueuedMessage *messages_head; |
134 | * Set to GNUNET_YES it the begin message should be transmitted to the service | 146 | struct QueuedMessage *messages_tail; |
135 | */ | ||
136 | int begin_requested; | ||
137 | }; | 147 | }; |
138 | 148 | ||
139 | 149 | ||
140 | static size_t | ||
141 | transmit_ack (void *cls, size_t size, void *buf); | ||
142 | |||
143 | static size_t | ||
144 | transmit_insert (void *cls, size_t size, void *buf); | ||
145 | |||
146 | static size_t | ||
147 | transmit_conclude (void *cls, size_t size, void *buf); | ||
148 | |||
149 | static size_t | ||
150 | transmit_begin (void *cls, size_t size, void *buf); | ||
151 | |||
152 | 150 | ||
153 | /** | 151 | /** |
154 | * Call notify_transmit_ready for ack if necessary and possible. | 152 | * Schedule transmitting the next message. |
153 | * | ||
154 | * @param consensus consensus handle | ||
155 | */ | 155 | */ |
156 | static void | 156 | static void |
157 | ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus) | 157 | schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus); |
158 | { | ||
159 | if ((NULL == consensus->th) && (NULL != consensus->ack_head)) | ||
160 | { | ||
161 | consensus->th = | ||
162 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
163 | sizeof (struct GNUNET_CONSENSUS_AckMessage), | ||
164 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
165 | GNUNET_NO, &transmit_ack, consensus); | ||
166 | } | ||
167 | } | ||
168 | 158 | ||
169 | 159 | ||
170 | /** | 160 | /** |
171 | * Call notify_transmit_ready for ack if necessary and possible. | 161 | * Function called to notify a client about the connection |
162 | * begin ready to queue more data. "buf" will be | ||
163 | * NULL and "size" zero if the connection was closed for | ||
164 | * writing in the meantime. | ||
165 | * | ||
166 | * @param cls closure | ||
167 | * @param size number of bytes available in buf | ||
168 | * @param buf where the callee should write the message | ||
169 | * @return number of bytes written to buf | ||
172 | */ | 170 | */ |
173 | static void | 171 | static size_t transmit_queued (void *cls, size_t size, |
174 | ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus) | 172 | void *buf) |
175 | { | 173 | { |
176 | if ((NULL == consensus->th) && (NULL != consensus->insert_element)) | 174 | struct GNUNET_CONSENSUS_Handle *consensus; |
175 | struct QueuedMessage *qmsg; | ||
176 | size_t ret_size; | ||
177 | |||
178 | printf("transmitting queued\n"); | ||
179 | |||
180 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | ||
181 | qmsg = consensus->messages_head; | ||
182 | GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); | ||
183 | GNUNET_assert (qmsg); | ||
184 | |||
185 | if (NULL == buf) | ||
177 | { | 186 | { |
178 | consensus->th = | 187 | if (NULL != qmsg->idc) |
179 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | 188 | { |
180 | sizeof (struct GNUNET_CONSENSUS_ElementMessage) + | 189 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); |
181 | consensus->insert_element->size, | 190 | } |
182 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
183 | GNUNET_NO, &transmit_insert, consensus); | ||
184 | } | 191 | } |
185 | } | ||
186 | |||
187 | 192 | ||
188 | /** | 193 | memcpy (buf, qmsg->msg, qmsg->size); |
189 | * Call notify_transmit_ready for ack if necessary and possible. | 194 | ret_size = qmsg->size; |
190 | */ | 195 | if (NULL != qmsg->idc) |
191 | static void | ||
192 | ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus) | ||
193 | { | ||
194 | if ((NULL == consensus->th) && (NULL != consensus->conclude_cb)) | ||
195 | { | 196 | { |
196 | consensus->th = | 197 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); |
197 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
198 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), | ||
199 | GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline), | ||
200 | GNUNET_NO, &transmit_conclude, consensus); | ||
201 | } | 198 | } |
199 | GNUNET_free (qmsg->msg); | ||
200 | GNUNET_free (qmsg); | ||
201 | |||
202 | schedule_transmit (consensus); | ||
203 | |||
204 | return ret_size; | ||
202 | } | 205 | } |
203 | 206 | ||
204 | 207 | ||
205 | /** | 208 | /** |
206 | * Call notify_transmit_ready for ack if necessary and possible. | 209 | * Schedule transmitting the next message. |
210 | * | ||
211 | * @param consensus consensus handle | ||
207 | */ | 212 | */ |
208 | static void | 213 | static void |
209 | ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) | 214 | schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus) |
210 | { | 215 | { |
211 | if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) && | 216 | if (NULL != consensus->th) |
212 | (GNUNET_NO == consensus->begin_sent)) | 217 | return; |
218 | |||
219 | if (NULL != consensus->messages_head) | ||
213 | { | 220 | { |
214 | consensus->th = | 221 | LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); |
215 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | 222 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, consensus->messages_head->size, |
216 | sizeof (struct GNUNET_MessageHeader), | 223 | GNUNET_TIME_UNIT_FOREVER_REL, |
217 | GNUNET_TIME_UNIT_FOREVER_REL, | 224 | GNUNET_NO, &transmit_queued, consensus); |
218 | GNUNET_NO, &transmit_begin, consensus); | ||
219 | } | 225 | } |
220 | } | 226 | } |
221 | 227 | ||
228 | |||
222 | /** | 229 | /** |
223 | * Called when the server has sent is a new element | 230 | * Called when the server has sent is a new element |
224 | * | 231 | * |
@@ -226,11 +233,12 @@ ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) | |||
226 | * @param msg element message | 233 | * @param msg element message |
227 | */ | 234 | */ |
228 | static void | 235 | static void |
229 | handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, | 236 | handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, |
230 | struct GNUNET_CONSENSUS_ElementMessage *msg) | 237 | struct GNUNET_CONSENSUS_ElementMessage *msg) |
231 | { | 238 | { |
232 | struct GNUNET_CONSENSUS_Element element; | 239 | struct GNUNET_CONSENSUS_Element element; |
233 | struct ElementAck *ack; | 240 | struct GNUNET_CONSENSUS_AckMessage *ack_msg; |
241 | struct QueuedMessage *queued_msg; | ||
234 | int ret; | 242 | int ret; |
235 | 243 | ||
236 | element.type = msg->element_type; | 244 | element.type = msg->element_type; |
@@ -238,11 +246,15 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, | |||
238 | element.data = &msg[1]; | 246 | element.data = &msg[1]; |
239 | 247 | ||
240 | ret = consensus->new_element_cb (consensus->new_element_cls, &element); | 248 | ret = consensus->new_element_cb (consensus->new_element_cls, &element); |
241 | ack = GNUNET_malloc (sizeof (struct ElementAck)); | ||
242 | ack->keep = ret; | ||
243 | GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack); | ||
244 | 249 | ||
245 | ntr_ack (consensus); | 250 | queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage)); |
251 | queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1]; | ||
252 | |||
253 | ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg; | ||
254 | ack_msg->keep = ret; | ||
255 | |||
256 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, | ||
257 | queued_msg); | ||
246 | } | 258 | } |
247 | 259 | ||
248 | 260 | ||
@@ -254,13 +266,12 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, | |||
254 | * @param msg conclude done message | 266 | * @param msg conclude done message |
255 | */ | 267 | */ |
256 | static void | 268 | static void |
257 | handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, | 269 | handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, |
258 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) | 270 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) |
259 | { | 271 | { |
260 | GNUNET_assert (NULL != consensus->conclude_cb); | 272 | GNUNET_assert (NULL != consensus->conclude_cb); |
261 | consensus->conclude_cb(consensus->conclude_cls, | 273 | consensus->conclude_cb (consensus->conclude_cls, |
262 | msg->num_peers, | 274 | 0, NULL); |
263 | (struct GNUNET_PeerIdentity *) &msg[1]); | ||
264 | consensus->conclude_cb = NULL; | 275 | consensus->conclude_cb = NULL; |
265 | } | 276 | } |
266 | 277 | ||
@@ -287,12 +298,6 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
287 | GNUNET_CLIENT_disconnect (consensus->client); | 298 | GNUNET_CLIENT_disconnect (consensus->client); |
288 | consensus->client = NULL; | 299 | consensus->client = NULL; |
289 | consensus->new_element_cb (NULL, NULL); | 300 | consensus->new_element_cb (NULL, NULL); |
290 | if (NULL != consensus->idc) | ||
291 | { | ||
292 | consensus->idc(consensus->idc_cls, GNUNET_NO); | ||
293 | consensus->idc = NULL; | ||
294 | consensus->idc_cls = NULL; | ||
295 | } | ||
296 | return; | 301 | return; |
297 | } | 302 | } |
298 | 303 | ||
@@ -305,108 +310,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
305 | handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); | 310 | handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); |
306 | break; | 311 | break; |
307 | default: | 312 | default: |
308 | LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring"); | 313 | GNUNET_break (0); |
309 | } | 314 | } |
310 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | 315 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, |
311 | GNUNET_TIME_UNIT_FOREVER_REL); | 316 | GNUNET_TIME_UNIT_FOREVER_REL); |
312 | } | 317 | } |
313 | 318 | ||
314 | |||
315 | |||
316 | |||
317 | /** | ||
318 | * Function called to notify a client about the connection | ||
319 | * begin ready to queue more data. "buf" will be | ||
320 | * NULL and "size" zero if the connection was closed for | ||
321 | * writing in the meantime. | ||
322 | * | ||
323 | * @param cls closure | ||
324 | * @param size number of bytes available in buf | ||
325 | * @param buf where the callee should write the message | ||
326 | * @return number of bytes written to buf | ||
327 | */ | ||
328 | static size_t | ||
329 | transmit_ack (void *cls, size_t size, void *buf) | ||
330 | { | ||
331 | struct GNUNET_CONSENSUS_AckMessage *msg; | ||
332 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
333 | |||
334 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | ||
335 | |||
336 | GNUNET_assert (NULL != consensus->ack_head); | ||
337 | |||
338 | msg = (struct GNUNET_CONSENSUS_AckMessage *) buf; | ||
339 | msg->keep = consensus->ack_head->keep; | ||
340 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); | ||
341 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage)); | ||
342 | |||
343 | consensus->ack_head = consensus->ack_head->next; | ||
344 | |||
345 | consensus->th = NULL; | ||
346 | |||
347 | ntr_insert (consensus); | ||
348 | ntr_ack (consensus); | ||
349 | ntr_conclude (consensus); | ||
350 | |||
351 | return sizeof (struct GNUNET_CONSENSUS_AckMessage); | ||
352 | } | ||
353 | |||
354 | /** | ||
355 | * Function called to notify a client about the connection | ||
356 | * begin ready to queue more data. "buf" will be | ||
357 | * NULL and "size" zero if the connection was closed for | ||
358 | * writing in the meantime. | ||
359 | * | ||
360 | * @param cls closure | ||
361 | * @param size number of bytes available in buf | ||
362 | * @param buf where the callee should write the message | ||
363 | * @return number of bytes written to buf | ||
364 | */ | ||
365 | static size_t | ||
366 | transmit_insert (void *cls, size_t size, void *buf) | ||
367 | { | ||
368 | struct GNUNET_CONSENSUS_ElementMessage *msg; | ||
369 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
370 | GNUNET_CONSENSUS_InsertDoneCallback idc; | ||
371 | int msize; | ||
372 | void *idc_cls; | ||
373 | |||
374 | GNUNET_assert (NULL != buf); | ||
375 | |||
376 | consensus = cls; | ||
377 | |||
378 | GNUNET_assert (NULL != consensus->insert_element); | ||
379 | |||
380 | consensus->th = NULL; | ||
381 | |||
382 | msg = buf; | ||
383 | |||
384 | msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) + | ||
385 | consensus->insert_element->size; | ||
386 | |||
387 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); | ||
388 | msg->header.size = htons (msize); | ||
389 | memcpy (&msg[1], | ||
390 | consensus->insert_element->data, | ||
391 | consensus->insert_element->size); | ||
392 | |||
393 | consensus->insert_element = NULL; | ||
394 | |||
395 | idc = consensus->idc; | ||
396 | consensus->idc = NULL; | ||
397 | idc_cls = consensus->idc_cls; | ||
398 | consensus->idc_cls = NULL; | ||
399 | idc (idc_cls, GNUNET_YES); | ||
400 | |||
401 | |||
402 | ntr_ack (consensus); | ||
403 | ntr_insert (consensus); | ||
404 | ntr_conclude (consensus); | ||
405 | |||
406 | return msize; | ||
407 | } | ||
408 | |||
409 | |||
410 | /** | 319 | /** |
411 | * Function called to notify a client about the connection | 320 | * Function called to notify a client about the connection |
412 | * begin ready to queue more data. "buf" will be | 321 | * begin ready to queue more data. "buf" will be |
@@ -427,7 +336,7 @@ transmit_join (void *cls, size_t size, void *buf) | |||
427 | 336 | ||
428 | GNUNET_assert (NULL != buf); | 337 | GNUNET_assert (NULL != buf); |
429 | 338 | ||
430 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); | 339 | LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n"); |
431 | 340 | ||
432 | consensus = cls; | 341 | consensus = cls; |
433 | consensus->th = NULL; | 342 | consensus->th = NULL; |
@@ -447,9 +356,7 @@ transmit_join (void *cls, size_t size, void *buf) | |||
447 | consensus->peers, | 356 | consensus->peers, |
448 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 357 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); |
449 | 358 | ||
450 | ntr_insert (consensus); | 359 | schedule_transmit (consensus); |
451 | ntr_begin (consensus); | ||
452 | ntr_conclude (consensus); | ||
453 | 360 | ||
454 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | 361 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, |
455 | GNUNET_TIME_UNIT_FOREVER_REL); | 362 | GNUNET_TIME_UNIT_FOREVER_REL); |
@@ -457,88 +364,11 @@ transmit_join (void *cls, size_t size, void *buf) | |||
457 | return msize; | 364 | return msize; |
458 | } | 365 | } |
459 | 366 | ||
460 | |||
461 | /** | ||
462 | * Function called to notify a client about the connection | ||
463 | * begin ready to queue more data. "buf" will be | ||
464 | * NULL and "size" zero if the connection was closed for | ||
465 | * writing in the meantime. | ||
466 | * | ||
467 | * @param cls closure | ||
468 | * @param size number of bytes available in buf | ||
469 | * @param buf where the callee should write the message | ||
470 | * @return number of bytes written to buf | ||
471 | */ | ||
472 | static size_t | ||
473 | transmit_conclude (void *cls, size_t size, void *buf) | ||
474 | { | ||
475 | struct GNUNET_CONSENSUS_ConcludeMessage *msg; | ||
476 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
477 | int msize; | ||
478 | |||
479 | GNUNET_assert (NULL != buf); | ||
480 | |||
481 | consensus = cls; | ||
482 | consensus->th = NULL; | ||
483 | |||
484 | msg = buf; | ||
485 | |||
486 | msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); | ||
487 | |||
488 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); | ||
489 | msg->header.size = htons (msize); | ||
490 | msg->timeout = | ||
491 | GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); | ||
492 | |||
493 | ntr_ack (consensus); | ||
494 | |||
495 | return msize; | ||
496 | } | ||
497 | |||
498 | |||
499 | /** | ||
500 | * Function called to notify a client about the connection | ||
501 | * begin ready to queue more data. "buf" will be | ||
502 | * NULL and "size" zero if the connection was closed for | ||
503 | * writing in the meantime. | ||
504 | * | ||
505 | * @param cls the consensus handle | ||
506 | * @param size number of bytes available in buf | ||
507 | * @param buf where the callee should write the message | ||
508 | * @return number of bytes written to buf | ||
509 | */ | ||
510 | static size_t | ||
511 | transmit_begin (void *cls, size_t size, void *buf) | ||
512 | { | ||
513 | struct GNUNET_MessageHeader *msg; | ||
514 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
515 | int msize; | ||
516 | |||
517 | GNUNET_assert (NULL != buf); | ||
518 | |||
519 | consensus = cls; | ||
520 | consensus->th = NULL; | ||
521 | |||
522 | msg = buf; | ||
523 | |||
524 | msize = sizeof (struct GNUNET_MessageHeader); | ||
525 | |||
526 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); | ||
527 | msg->size = htons (msize); | ||
528 | |||
529 | ntr_ack (consensus); | ||
530 | ntr_insert (consensus); | ||
531 | ntr_conclude (consensus); | ||
532 | |||
533 | return msize; | ||
534 | } | ||
535 | |||
536 | |||
537 | /** | 367 | /** |
538 | * Create a consensus session. | 368 | * Create a consensus session. |
539 | * | 369 | * |
540 | * @param cfg | 370 | * @param cfg configuration to use for connecting to the consensus service |
541 | * @param num_peers | 371 | * @param num_peers number of peers in the peers array |
542 | * @param peers array of peers participating in this consensus session | 372 | * @param peers array of peers participating in this consensus session |
543 | * Inclusion of the local peer is optional. | 373 | * Inclusion of the local peer is optional. |
544 | * @param session_id session identifier | 374 | * @param session_id session identifier |
@@ -553,7 +383,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
553 | unsigned int num_peers, | 383 | unsigned int num_peers, |
554 | const struct GNUNET_PeerIdentity *peers, | 384 | const struct GNUNET_PeerIdentity *peers, |
555 | const struct GNUNET_HashCode *session_id, | 385 | const struct GNUNET_HashCode *session_id, |
556 | GNUNET_CONSENSUS_NewElementCallback new_element_cb, | 386 | GNUNET_CONSENSUS_ElementCallback new_element_cb, |
557 | void *new_element_cls) | 387 | void *new_element_cls) |
558 | { | 388 | { |
559 | struct GNUNET_CONSENSUS_Handle *consensus; | 389 | struct GNUNET_CONSENSUS_Handle *consensus; |
@@ -567,17 +397,10 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
567 | consensus->session_id = *session_id; | 397 | consensus->session_id = *session_id; |
568 | 398 | ||
569 | if (0 == num_peers) | 399 | if (0 == num_peers) |
570 | { | ||
571 | consensus->peers = NULL; | 400 | consensus->peers = NULL; |
572 | } | ||
573 | else if (num_peers > 0) | 401 | else if (num_peers > 0) |
574 | { | 402 | consensus->peers = |
575 | consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); | 403 | GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); |
576 | } | ||
577 | else | ||
578 | { | ||
579 | GNUNET_break (0); | ||
580 | } | ||
581 | 404 | ||
582 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); | 405 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); |
583 | 406 | ||
@@ -615,45 +438,37 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
615 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 438 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
616 | void *idc_cls) | 439 | void *idc_cls) |
617 | { | 440 | { |
618 | GNUNET_assert (NULL == consensus->idc); | 441 | struct QueuedMessage *qmsg; |
619 | GNUNET_assert (NULL == consensus->insert_element); | 442 | struct GNUNET_CONSENSUS_ElementMessage *element_msg; |
620 | GNUNET_assert (NULL == consensus->conclude_cb); | 443 | size_t element_msg_size; |
621 | 444 | ||
622 | consensus->idc = idc; | 445 | LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size); |
623 | consensus->idc_cls = idc_cls; | ||
624 | consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size); | ||
625 | 446 | ||
626 | if (consensus->joined == 0) | 447 | element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + |
627 | { | 448 | element->size); |
628 | return; | ||
629 | } | ||
630 | 449 | ||
631 | ntr_insert (consensus); | 450 | element_msg = GNUNET_malloc (element_msg_size); |
632 | } | 451 | element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); |
452 | element_msg->header.size = htons (element_msg_size); | ||
453 | memcpy (&element_msg[1], element->data, element->size); | ||
633 | 454 | ||
455 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | ||
456 | qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; | ||
457 | qmsg->size = element_msg_size; | ||
458 | qmsg->idc = idc; | ||
459 | qmsg->idc_cls = idc_cls; | ||
634 | 460 | ||
635 | /** | 461 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); |
636 | * Begin reconciling elements with other peers. | ||
637 | * | ||
638 | * @param consensus handle for the consensus session | ||
639 | */ | ||
640 | void | ||
641 | GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) | ||
642 | { | ||
643 | GNUNET_assert (NULL == consensus->idc); | ||
644 | GNUNET_assert (NULL == consensus->insert_element); | ||
645 | GNUNET_assert (GNUNET_NO == consensus->begin_requested); | ||
646 | GNUNET_assert (GNUNET_NO == consensus->begin_sent); | ||
647 | |||
648 | consensus->begin_requested = GNUNET_YES; | ||
649 | 462 | ||
650 | ntr_begin (consensus); | 463 | schedule_transmit (consensus); |
651 | } | 464 | } |
652 | 465 | ||
653 | 466 | ||
654 | /** | 467 | /** |
655 | * We are finished inserting new elements into the consensus; | 468 | * We are done with inserting new elements into the consensus; |
656 | * try to conclude the consensus within a given time window. | 469 | * try to conclude the consensus within a given time window. |
470 | * After conclude has been called, no further elements may be | ||
471 | * inserted by the client. | ||
657 | * | 472 | * |
658 | * @param consensus consensus session | 473 | * @param consensus consensus session |
659 | * @param timeout timeout after which the conculde callback | 474 | * @param timeout timeout after which the conculde callback |
@@ -664,20 +479,32 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) | |||
664 | void | 479 | void |
665 | GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | 480 | GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, |
666 | struct GNUNET_TIME_Relative timeout, | 481 | struct GNUNET_TIME_Relative timeout, |
482 | unsigned int min_group_size_in_consensus, | ||
667 | GNUNET_CONSENSUS_ConcludeCallback conclude, | 483 | GNUNET_CONSENSUS_ConcludeCallback conclude, |
668 | void *conclude_cls) | 484 | void *conclude_cls) |
669 | { | 485 | { |
486 | struct QueuedMessage *qmsg; | ||
487 | struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; | ||
488 | |||
670 | GNUNET_assert (NULL != conclude); | 489 | GNUNET_assert (NULL != conclude); |
671 | GNUNET_assert (NULL == consensus->conclude_cb); | 490 | GNUNET_assert (NULL == consensus->conclude_cb); |
672 | 491 | ||
673 | consensus->conclude_cls = conclude_cls; | 492 | consensus->conclude_cls = conclude_cls; |
674 | consensus->conclude_cb = conclude; | 493 | consensus->conclude_cb = conclude; |
675 | consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); | ||
676 | 494 | ||
495 | conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | ||
496 | conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); | ||
497 | conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | ||
498 | conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); | ||
499 | conclude_msg->min_group_size = min_group_size_in_consensus; | ||
500 | |||
501 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | ||
502 | qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; | ||
503 | qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); | ||
504 | |||
505 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); | ||
677 | 506 | ||
678 | /* if transmitting the conclude message is not possible right now, transmit_join | 507 | schedule_transmit (consensus); |
679 | * or transmit_ack will handle it */ | ||
680 | ntr_conclude (consensus); | ||
681 | } | 508 | } |
682 | 509 | ||
683 | 510 | ||
diff --git a/src/consensus/gnunet-consensus-start-peers.c b/src/consensus/gnunet-consensus-start-peers.c index fb7f047ae..8b516393f 100644 --- a/src/consensus/gnunet-consensus-start-peers.c +++ b/src/consensus/gnunet-consensus-start-peers.c | |||
@@ -147,6 +147,9 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
147 | NULL, | 147 | NULL, |
148 | test_master, | 148 | test_master, |
149 | NULL); | 149 | NULL); |
150 | |||
151 | |||
152 | printf("hello there!\n"); | ||
150 | } | 153 | } |
151 | 154 | ||
152 | 155 | ||
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index bc518657e..cd267f5ec 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -62,10 +62,10 @@ stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | |||
62 | */ | 62 | */ |
63 | static void | 63 | static void |
64 | conclude_cb (void *cls, | 64 | conclude_cb (void *cls, |
65 | unsigned int num_peers_in_consensus, | 65 | unsigned int consensus_group_count, |
66 | const struct GNUNET_PeerIdentity *peers_in_consensus) | 66 | const struct GNUNET_CONSENSUS_Group *groups) |
67 | { | 67 | { |
68 | printf("reached conclusion with %d peers\n", num_peers_in_consensus); | 68 | printf("reached conclusion\n"); |
69 | GNUNET_SCHEDULER_shutdown (); | 69 | GNUNET_SCHEDULER_shutdown (); |
70 | } | 70 | } |
71 | 71 | ||
@@ -111,7 +111,7 @@ stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
111 | if (feof (stdin)) | 111 | if (feof (stdin)) |
112 | { | 112 | { |
113 | printf ("concluding ...\n"); | 113 | printf ("concluding ...\n"); |
114 | GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, conclude_cb, NULL); | 114 | GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 0, conclude_cb, NULL); |
115 | } | 115 | } |
116 | return; | 116 | return; |
117 | } | 117 | } |
@@ -144,6 +144,12 @@ static int | |||
144 | cb (void *cls, | 144 | cb (void *cls, |
145 | struct GNUNET_CONSENSUS_Element *element) | 145 | struct GNUNET_CONSENSUS_Element *element) |
146 | { | 146 | { |
147 | if (NULL == element) | ||
148 | { | ||
149 | printf("error receiving from consensus\n"); | ||
150 | GNUNET_SCHEDULER_shutdown (); | ||
151 | return GNUNET_NO; | ||
152 | } | ||
147 | printf("got element\n"); | 153 | printf("got element\n"); |
148 | return GNUNET_YES; | 154 | return GNUNET_YES; |
149 | } | 155 | } |
@@ -178,10 +184,12 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
178 | 184 | ||
179 | if (NULL == session_id_str) | 185 | if (NULL == session_id_str) |
180 | { | 186 | { |
181 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given\n"); | 187 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given (missing -s/--session-id)\n"); |
182 | return; | 188 | return; |
183 | } | 189 | } |
184 | 190 | ||
191 | GNUNET_CRYPTO_hash (session_id_str, strlen (session_id_str), &sid); | ||
192 | |||
185 | for (count = 0; NULL != args[count]; count++); | 193 | for (count = 0; NULL != args[count]; count++); |
186 | 194 | ||
187 | if (0 != count) | 195 | if (0 != count) |
@@ -213,9 +221,6 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
213 | &sid, | 221 | &sid, |
214 | &cb, NULL); | 222 | &cb, NULL); |
215 | 223 | ||
216 | GNUNET_CONSENSUS_begin (consensus); | ||
217 | |||
218 | |||
219 | stdin_fh = GNUNET_DISK_get_handle_from_native (stdin); | 224 | stdin_fh = GNUNET_DISK_get_handle_from_native (stdin); |
220 | stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh, | 225 | stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh, |
221 | &stdin_cb, NULL); | 226 | &stdin_cb, NULL); |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 195efc681..1b394db19 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -16,12 +16,19 @@ | |||
16 | along with GNUnet; see the file COPYING. If not, write to the | 16 | along with GNUnet; see the file COPYING. If not, write to the |
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | 17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | |||
20 | 21 | ||
22 | /** | ||
23 | * @file consensus/gnunet-service-consensus.c | ||
24 | * @brief | ||
25 | * @author Florian Dold | ||
26 | */ | ||
21 | 27 | ||
22 | #include "platform.h" | 28 | #include "platform.h" |
23 | #include "gnunet_common.h" | 29 | #include "gnunet_common.h" |
24 | #include "gnunet_protocols.h" | 30 | #include "gnunet_protocols.h" |
31 | #include "gnunet_applications.h" | ||
25 | #include "gnunet_util_lib.h" | 32 | #include "gnunet_util_lib.h" |
26 | #include "gnunet_consensus_service.h" | 33 | #include "gnunet_consensus_service.h" |
27 | #include "gnunet_core_service.h" | 34 | #include "gnunet_core_service.h" |
@@ -57,6 +64,24 @@ struct PendingElement | |||
57 | }; | 64 | }; |
58 | 65 | ||
59 | 66 | ||
67 | /* | ||
68 | * A peer that is also in a consensus session. | ||
69 | * Note that 'this' peer is not in the list. | ||
70 | */ | ||
71 | struct ConsensusPeer | ||
72 | { | ||
73 | struct GNUNET_PeerIdentity *peer_id; | ||
74 | |||
75 | /** | ||
76 | * Incoming tunnel from the peer. | ||
77 | */ | ||
78 | struct GNUNET_MESH_Tunnel *incoming_tunnel; | ||
79 | |||
80 | struct InvertibleBloomFilter *last_ibf; | ||
81 | |||
82 | }; | ||
83 | |||
84 | |||
60 | /** | 85 | /** |
61 | * A consensus session consists of one local client and the remote authorities. | 86 | * A consensus session consists of one local client and the remote authorities. |
62 | */ | 87 | */ |
@@ -84,18 +109,14 @@ struct ConsensusSession | |||
84 | struct GNUNET_HashCode *global_id; | 109 | struct GNUNET_HashCode *global_id; |
85 | 110 | ||
86 | /** | 111 | /** |
87 | * Corresponding server handle. | 112 | * Local client in this consensus session. |
113 | * There is only one client per consensus session. | ||
88 | */ | 114 | */ |
89 | struct GNUNET_SERVER_Client *client; | 115 | struct GNUNET_SERVER_Client *client; |
90 | 116 | ||
91 | /** | 117 | /** |
92 | * Client wants to receive and send updates. | ||
93 | */ | ||
94 | int begin; | ||
95 | |||
96 | /** | ||
97 | * Values in the consensus set of this session, | 118 | * Values in the consensus set of this session, |
98 | * all of them either have been sent or approved by the client. | 119 | * all of them either have been sent by or approved by the client. |
99 | */ | 120 | */ |
100 | struct GNUNET_CONTAINER_MultiHashMap *values; | 121 | struct GNUNET_CONTAINER_MultiHashMap *values; |
101 | 122 | ||
@@ -110,12 +131,12 @@ struct ConsensusSession | |||
110 | struct PendingElement *transmit_pending_tail; | 131 | struct PendingElement *transmit_pending_tail; |
111 | 132 | ||
112 | /** | 133 | /** |
113 | * Elements that have not been sent to the client yet. | 134 | * Elements that have not been approved (or rejected) by the client yet. |
114 | */ | 135 | */ |
115 | struct PendingElement *approval_pending_head; | 136 | struct PendingElement *approval_pending_head; |
116 | 137 | ||
117 | /** | 138 | /** |
118 | * Elements that have not been sent to the client yet. | 139 | * Elements that have not been approved (or rejected) by the client yet. |
119 | */ | 140 | */ |
120 | struct PendingElement *approval_pending_tail; | 141 | struct PendingElement *approval_pending_tail; |
121 | 142 | ||
@@ -136,9 +157,47 @@ struct ConsensusSession | |||
136 | int conclude_sent; | 157 | int conclude_sent; |
137 | 158 | ||
138 | /** | 159 | /** |
160 | * Minimum number of peers to form a consensus group | ||
161 | */ | ||
162 | int conclude_group_min; | ||
163 | |||
164 | /** | ||
165 | * Current round of the conclusion | ||
166 | */ | ||
167 | int current_round; | ||
168 | |||
169 | /** | ||
170 | * Soft deadline for conclude. | ||
171 | * Speed up the speed of the consensus at the cost of consensus quality, as | ||
172 | * the time approached or crosses the deadline. | ||
173 | */ | ||
174 | struct GNUNET_TIME_Absolute conclude_deadline; | ||
175 | |||
176 | /** | ||
139 | * Number of other peers in the consensus | 177 | * Number of other peers in the consensus |
140 | */ | 178 | */ |
141 | int num_peers; | 179 | unsigned int num_peers; |
180 | |||
181 | /** | ||
182 | * Other peers in the consensus, array of ConsensusPeer | ||
183 | */ | ||
184 | struct ConsensusPeer *peers; | ||
185 | |||
186 | /** | ||
187 | * Tunnel for broadcasting to all other authorities | ||
188 | */ | ||
189 | struct GNUNET_MESH_Tunnel *broadcast_tunnel; | ||
190 | |||
191 | /** | ||
192 | * Time limit for one round of pairwise exchange. | ||
193 | * FIXME: should not actually be a constant | ||
194 | */ | ||
195 | struct GNUNET_TIME_Relative round_time; | ||
196 | |||
197 | /** | ||
198 | * Task identifier for the round timeout task | ||
199 | */ | ||
200 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; | ||
142 | }; | 201 | }; |
143 | 202 | ||
144 | 203 | ||
@@ -167,10 +226,21 @@ static struct GNUNET_SERVER_Handle *srv; | |||
167 | */ | 226 | */ |
168 | static struct GNUNET_PeerIdentity *my_peer; | 227 | static struct GNUNET_PeerIdentity *my_peer; |
169 | 228 | ||
229 | /** | ||
230 | * Handle to the mesh service. | ||
231 | */ | ||
232 | static struct GNUNET_MESH_Handle *mesh; | ||
233 | |||
234 | /** | ||
235 | * Handle to the core service. Only used during service startup, will be NULL after that. | ||
236 | */ | ||
237 | static struct GNUNET_CORE_Handle *core; | ||
238 | |||
170 | static void | 239 | static void |
171 | disconnect_client (struct GNUNET_SERVER_Client *client) | 240 | disconnect_client (struct GNUNET_SERVER_Client *client) |
172 | { | 241 | { |
173 | /* FIXME */ | 242 | GNUNET_SERVER_client_disconnect (client); |
243 | /* FIXME: free data structures that this client owns */ | ||
174 | } | 244 | } |
175 | 245 | ||
176 | static void | 246 | static void |
@@ -185,7 +255,6 @@ compute_global_id (struct GNUNET_HashCode *dst, | |||
185 | *dst = *local_id; | 255 | *dst = *local_id; |
186 | for (i = 0; i < num_peers; ++i) | 256 | for (i = 0; i < num_peers; ++i) |
187 | { | 257 | { |
188 | /* FIXME: maybe hash_xor/hash allow aliased source/target, and we can get by without tmp */ | ||
189 | GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp); | 258 | GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp); |
190 | *dst = tmp; | 259 | *dst = tmp; |
191 | GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp); | 260 | GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp); |
@@ -255,7 +324,7 @@ send_next (struct ConsensusSession *session) | |||
255 | 324 | ||
256 | if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO)) | 325 | if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO)) |
257 | { | 326 | { |
258 | /* just the conclude message with no other authorities in the dummy */ | 327 | /* FIXME */ |
259 | msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); | 328 | msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); |
260 | session->th = | 329 | session->th = |
261 | GNUNET_SERVER_notify_transmit_ready (session->client, msize, | 330 | GNUNET_SERVER_notify_transmit_ready (session->client, msize, |
@@ -274,6 +343,39 @@ send_next (struct ConsensusSession *session) | |||
274 | 343 | ||
275 | 344 | ||
276 | /** | 345 | /** |
346 | * Method called whenever a peer has disconnected from the tunnel. | ||
347 | * Implementations of this callback must NOT call | ||
348 | * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those | ||
349 | * to run in some other task later. However, calling | ||
350 | * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed. | ||
351 | * | ||
352 | * @param cls closure | ||
353 | * @param peer peer identity the tunnel stopped working with | ||
354 | */ | ||
355 | static void | ||
356 | disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer) | ||
357 | { | ||
358 | /* FIXME: how do we handle this */ | ||
359 | } | ||
360 | |||
361 | |||
362 | /** | ||
363 | * Method called whenever a peer has connected to the tunnel. | ||
364 | * | ||
365 | * @param cls closure | ||
366 | * @param peer peer identity the tunnel was created to, NULL on timeout | ||
367 | * @param atsi performance data for the connection | ||
368 | */ | ||
369 | static void | ||
370 | connect_handler (void *cls, | ||
371 | const struct GNUNET_PeerIdentity *peer, | ||
372 | const struct GNUNET_ATS_Information *atsi) | ||
373 | { | ||
374 | /* not much we can do here, now we know the other peer has been added to our broadcast tunnel */ | ||
375 | } | ||
376 | |||
377 | |||
378 | /** | ||
277 | * Called when a client wants to join a consensus session. | 379 | * Called when a client wants to join a consensus session. |
278 | * | 380 | * |
279 | * @param cls unused | 381 | * @param cls unused |
@@ -288,18 +390,24 @@ client_join (void *cls, | |||
288 | struct GNUNET_HashCode global_id; | 390 | struct GNUNET_HashCode global_id; |
289 | const struct GNUNET_CONSENSUS_JoinMessage *msg; | 391 | const struct GNUNET_CONSENSUS_JoinMessage *msg; |
290 | struct ConsensusSession *session; | 392 | struct ConsensusSession *session; |
393 | unsigned int i; | ||
291 | 394 | ||
292 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n"); | 395 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n"); |
293 | 396 | ||
294 | msg = (struct GNUNET_CONSENSUS_JoinMessage *) m; | 397 | msg = (struct GNUNET_CONSENSUS_JoinMessage *) m; |
295 | 398 | ||
399 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s (&msg->session_id)); | ||
400 | |||
296 | compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers); | 401 | compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers); |
297 | 402 | ||
403 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s (&global_id)); | ||
404 | |||
298 | session = sessions_head; | 405 | session = sessions_head; |
299 | while (NULL != session) | 406 | while (NULL != session) |
300 | { | 407 | { |
301 | if (client == session->client) | 408 | if (client == session->client) |
302 | { | 409 | { |
410 | |||
303 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n"); | 411 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n"); |
304 | disconnect_client (client); | 412 | disconnect_client (client); |
305 | return; | 413 | return; |
@@ -310,6 +418,7 @@ client_join (void *cls, | |||
310 | disconnect_client (client); | 418 | disconnect_client (client); |
311 | return; | 419 | return; |
312 | } | 420 | } |
421 | session = session->next; | ||
313 | } | 422 | } |
314 | 423 | ||
315 | GNUNET_SERVER_client_keep (client); | 424 | GNUNET_SERVER_client_keep (client); |
@@ -320,11 +429,40 @@ client_join (void *cls, | |||
320 | session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode)); | 429 | session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode)); |
321 | session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); | 430 | session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); |
322 | session->client = client; | 431 | session->client = client; |
432 | /* FIXME: should not be a constant, but chosen adaptively */ | ||
433 | session->round_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); | ||
323 | 434 | ||
324 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | 435 | session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session, connect_handler, disconnect_handler, session); |
436 | |||
437 | session->num_peers = 0; | ||
438 | |||
439 | /* count the peers that are not the local peer */ | ||
440 | for (i = 0; i < msg->num_peers; i++) | ||
441 | { | ||
442 | struct GNUNET_PeerIdentity *peers; | ||
443 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
444 | if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) | ||
445 | session->num_peers++; | ||
446 | } | ||
447 | |||
448 | session->peers = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeer)); | ||
449 | |||
450 | /* copy the peer identities and add peers to broadcast tunnel */ | ||
451 | for (i = 0; i < msg->num_peers; i++) | ||
452 | { | ||
453 | struct GNUNET_PeerIdentity *peers; | ||
454 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
455 | if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) | ||
456 | { | ||
457 | *session->peers->peer_id = peers[i]; | ||
458 | GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel, &peers[i]); | ||
459 | } | ||
460 | } | ||
325 | 461 | ||
326 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n"); | 462 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n"); |
327 | 463 | ||
464 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | ||
465 | |||
328 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 466 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
329 | } | 467 | } |
330 | 468 | ||
@@ -381,40 +519,28 @@ client_insert (void *cls, | |||
381 | 519 | ||
382 | 520 | ||
383 | /** | 521 | /** |
384 | * Called when a client wants to begin | 522 | * Do one round of the conclusion. |
523 | * Start by broadcasting the set difference estimator (IBF strata). | ||
524 | * | ||
385 | */ | 525 | */ |
386 | void | 526 | void |
387 | client_begin (void *cls, | 527 | conclude_do_round (struct ConsensusSession *session) |
388 | struct GNUNET_SERVER_Client *client, | ||
389 | const struct GNUNET_MessageHeader *message) | ||
390 | { | 528 | { |
391 | struct ConsensusSession *session; | 529 | /* FIXME */ |
392 | 530 | } | |
393 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client requested begin\n"); | ||
394 | |||
395 | session = sessions_head; | ||
396 | while (NULL != session) | ||
397 | { | ||
398 | if (session->client == client) | ||
399 | break; | ||
400 | } | ||
401 | |||
402 | if (NULL == session) | ||
403 | { | ||
404 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to 'begin', but client is not in any session\n"); | ||
405 | GNUNET_SERVER_client_disconnect (client); | ||
406 | return; | ||
407 | } | ||
408 | |||
409 | session->begin = GNUNET_YES; | ||
410 | 531 | ||
411 | send_next (session); | ||
412 | 532 | ||
413 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 533 | /** |
534 | * Cancel the current round if necessary, decide to run another round or | ||
535 | * terminate. | ||
536 | */ | ||
537 | void | ||
538 | conclude_round_done (struct ConsensusSession *session) | ||
539 | { | ||
540 | /* FIXME */ | ||
414 | } | 541 | } |
415 | 542 | ||
416 | 543 | ||
417 | |||
418 | /** | 544 | /** |
419 | * Called when a client performs the conclude operation. | 545 | * Called when a client performs the conclude operation. |
420 | */ | 546 | */ |
@@ -425,6 +551,8 @@ client_conclude (void *cls, | |||
425 | { | 551 | { |
426 | struct ConsensusSession *session; | 552 | struct ConsensusSession *session; |
427 | 553 | ||
554 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); | ||
555 | |||
428 | session = sessions_head; | 556 | session = sessions_head; |
429 | while ((session != NULL) && (session->client != client)) | 557 | while ((session != NULL) && (session->client != client)) |
430 | { | 558 | { |
@@ -432,12 +560,25 @@ client_conclude (void *cls, | |||
432 | } | 560 | } |
433 | if (NULL == session) | 561 | if (NULL == session) |
434 | { | 562 | { |
563 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n"); | ||
564 | GNUNET_SERVER_client_disconnect (client); | ||
565 | return; | ||
566 | } | ||
567 | |||
568 | if (GNUNET_YES == session->conclude_requested) | ||
569 | { | ||
570 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude twice\n"); | ||
435 | GNUNET_SERVER_client_disconnect (client); | 571 | GNUNET_SERVER_client_disconnect (client); |
436 | return; | 572 | return; |
437 | } | 573 | } |
574 | |||
438 | session->conclude_requested = GNUNET_YES; | 575 | session->conclude_requested = GNUNET_YES; |
439 | send_next (session); | 576 | |
577 | conclude_do_round (session); | ||
578 | |||
440 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 579 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
580 | |||
581 | send_next (session); | ||
441 | } | 582 | } |
442 | 583 | ||
443 | 584 | ||
@@ -462,10 +603,8 @@ static void | |||
462 | disconnect_core (void *cls, | 603 | disconnect_core (void *cls, |
463 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 604 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
464 | { | 605 | { |
465 | struct GNUNET_CORE_Handle *core; | ||
466 | core = (struct GNUNET_CORE_Handle *) cls; | ||
467 | GNUNET_CORE_disconnect (core); | 606 | GNUNET_CORE_disconnect (core); |
468 | 607 | core = NULL; | |
469 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); | 608 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); |
470 | } | 609 | } |
471 | 610 | ||
@@ -478,8 +617,6 @@ core_startup (void *cls, | |||
478 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { | 617 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { |
479 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | 618 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, |
480 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | 619 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, |
481 | {&client_begin, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN, | ||
482 | sizeof (struct GNUNET_MessageHeader)}, | ||
483 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, | 620 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, |
484 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, | 621 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, |
485 | {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, | 622 | {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, |
@@ -489,13 +626,173 @@ core_startup (void *cls, | |||
489 | 626 | ||
490 | GNUNET_SERVER_add_handlers (srv, handlers); | 627 | GNUNET_SERVER_add_handlers (srv, handlers); |
491 | my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); | 628 | my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); |
629 | /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ | ||
492 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); | 630 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); |
493 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); | 631 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); |
494 | } | 632 | } |
495 | 633 | ||
496 | 634 | ||
635 | |||
636 | /** | ||
637 | * Method called whenever another peer has added us to a tunnel | ||
638 | * the other peer initiated. | ||
639 | * Only called (once) upon reception of data with a message type which was | ||
640 | * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy | ||
641 | * causes te tunnel to be ignored and no further notifications are sent about | ||
642 | * the same tunnel. | ||
643 | * | ||
644 | * @param cls closure | ||
645 | * @param tunnel new handle to the tunnel | ||
646 | * @param initiator peer that started the tunnel | ||
647 | * @param atsi performance information for the tunnel | ||
648 | * @return initial tunnel context for the tunnel | ||
649 | * (can be NULL -- that's not an error) | ||
650 | */ | ||
651 | static void * | ||
652 | new_tunnel (void *cls, | ||
653 | struct GNUNET_MESH_Tunnel *tunnel, | ||
654 | const struct GNUNET_PeerIdentity *initiator, | ||
655 | const struct GNUNET_ATS_Information *atsi) | ||
656 | { | ||
657 | /* there's nothing we can do here, as we don't have the global consensus id yet */ | ||
658 | return NULL; | ||
659 | } | ||
660 | |||
661 | |||
662 | /** | ||
663 | * Function called whenever an inbound tunnel is destroyed. Should clean up | ||
664 | * any associated state. This function is NOT called if the client has | ||
665 | * explicitly asked for the tunnel to be destroyed using | ||
666 | * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on | ||
667 | * the tunnel. | ||
668 | * | ||
669 | * @param cls closure (set from GNUNET_MESH_connect) | ||
670 | * @param tunnel connection to the other end (henceforth invalid) | ||
671 | * @param tunnel_ctx place where local state associated | ||
672 | * with the tunnel is stored | ||
673 | */ | ||
674 | static void | ||
675 | cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx) | ||
676 | { | ||
677 | /* FIXME: what to do here? */ | ||
678 | } | ||
679 | |||
680 | |||
681 | |||
497 | /** | 682 | /** |
498 | * Process consensus requests. | 683 | * Called to clean up, after a shutdown has been requested. |
684 | * | ||
685 | * @param cls closure | ||
686 | * @param tc context information (why was this task triggered now) | ||
687 | */ | ||
688 | static void | ||
689 | shutdown_task (void *cls, | ||
690 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
691 | { | ||
692 | /* mesh requires all the tunnels to be destroyed manually */ | ||
693 | while (NULL != sessions_head) | ||
694 | { | ||
695 | struct ConsensusSession *session; | ||
696 | session = sessions_head; | ||
697 | GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel); | ||
698 | sessions_head = sessions_head->next; | ||
699 | GNUNET_free (session); | ||
700 | } | ||
701 | |||
702 | if (NULL != mesh) | ||
703 | { | ||
704 | GNUNET_MESH_disconnect (mesh); | ||
705 | mesh = NULL; | ||
706 | } | ||
707 | if (NULL != core) | ||
708 | { | ||
709 | GNUNET_CORE_disconnect (core); | ||
710 | core = NULL; | ||
711 | } | ||
712 | } | ||
713 | |||
714 | |||
715 | |||
716 | /** | ||
717 | * Functions with this signature are called whenever a message is | ||
718 | * received. | ||
719 | * | ||
720 | * @param cls closure (set from GNUNET_MESH_connect) | ||
721 | * @param tunnel connection to the other end | ||
722 | * @param tunnel_ctx place to store local state associated with the tunnel | ||
723 | * @param sender who sent the message | ||
724 | * @param message the actual message | ||
725 | * @param atsi performance data for the connection | ||
726 | * @return GNUNET_OK to keep the connection open, | ||
727 | * GNUNET_SYSERR to close it (signal serious error) | ||
728 | */ | ||
729 | static int | ||
730 | p2p_delta_estimate (void *cls, | ||
731 | struct GNUNET_MESH_Tunnel * tunnel, | ||
732 | void **tunnel_ctx, | ||
733 | const struct GNUNET_PeerIdentity *sender, | ||
734 | const struct GNUNET_MessageHeader *message, | ||
735 | const struct GNUNET_ATS_Information *atsi) | ||
736 | { | ||
737 | /* FIXME */ | ||
738 | return GNUNET_OK; | ||
739 | } | ||
740 | |||
741 | |||
742 | /** | ||
743 | * Functions with this signature are called whenever a message is | ||
744 | * received. | ||
745 | * | ||
746 | * @param cls closure (set from GNUNET_MESH_connect) | ||
747 | * @param tunnel connection to the other end | ||
748 | * @param tunnel_ctx place to store local state associated with the tunnel | ||
749 | * @param sender who sent the message | ||
750 | * @param message the actual message | ||
751 | * @param atsi performance data for the connection | ||
752 | * @return GNUNET_OK to keep the connection open, | ||
753 | * GNUNET_SYSERR to close it (signal serious error) | ||
754 | */ | ||
755 | static int | ||
756 | p2p_difference_digest (void *cls, | ||
757 | struct GNUNET_MESH_Tunnel * tunnel, | ||
758 | void **tunnel_ctx, | ||
759 | const struct GNUNET_PeerIdentity *sender, | ||
760 | const struct GNUNET_MessageHeader *message, | ||
761 | const struct GNUNET_ATS_Information *atsi) | ||
762 | { | ||
763 | /* FIXME */ | ||
764 | return GNUNET_OK; | ||
765 | } | ||
766 | |||
767 | |||
768 | /** | ||
769 | * Functions with this signature are called whenever a message is | ||
770 | * received. | ||
771 | * | ||
772 | * @param cls closure (set from GNUNET_MESH_connect) | ||
773 | * @param tunnel connection to the other end | ||
774 | * @param tunnel_ctx place to store local state associated with the tunnel | ||
775 | * @param sender who sent the message | ||
776 | * @param message the actual message | ||
777 | * @param atsi performance data for the connection | ||
778 | * @return GNUNET_OK to keep the connection open, | ||
779 | * GNUNET_SYSERR to close it (signal serious error) | ||
780 | */ | ||
781 | static int | ||
782 | p2p_elements_and_requests (void *cls, | ||
783 | struct GNUNET_MESH_Tunnel * tunnel, | ||
784 | void **tunnel_ctx, | ||
785 | const struct GNUNET_PeerIdentity *sender, | ||
786 | const struct GNUNET_MessageHeader *message, | ||
787 | const struct GNUNET_ATS_Information *atsi) | ||
788 | { | ||
789 | /* FIXME */ | ||
790 | return GNUNET_OK; | ||
791 | } | ||
792 | |||
793 | |||
794 | /** | ||
795 | * Start processing consensus requests. | ||
499 | * | 796 | * |
500 | * @param cls closure | 797 | * @param cls closure |
501 | * @param server the initialized server | 798 | * @param server the initialized server |
@@ -504,22 +801,38 @@ core_startup (void *cls, | |||
504 | static void | 801 | static void |
505 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) | 802 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) |
506 | { | 803 | { |
507 | struct GNUNET_CORE_Handle *my_core; | ||
508 | static const struct GNUNET_CORE_MessageHandler handlers[] = { | 804 | static const struct GNUNET_CORE_MessageHandler handlers[] = { |
509 | {NULL, 0, 0} | 805 | {NULL, 0, 0} |
510 | }; | 806 | }; |
807 | static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { | ||
808 | {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0}, | ||
809 | {p2p_difference_digest, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0}, | ||
810 | {p2p_elements_and_requests, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0}, | ||
811 | {NULL, 0, 0} | ||
812 | }; | ||
813 | static const GNUNET_MESH_ApplicationType app_types[] = { | ||
814 | GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
815 | GNUNET_APPLICATION_TYPE_END | ||
816 | }; | ||
511 | 817 | ||
512 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | 818 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); |
513 | 819 | ||
514 | cfg = c; | 820 | cfg = c; |
515 | srv = server; | 821 | srv = server; |
516 | my_core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers); | 822 | |
517 | GNUNET_assert (NULL != my_core); | 823 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); |
824 | |||
825 | mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers, app_types); | ||
826 | GNUNET_assert (NULL != mesh); | ||
827 | |||
828 | /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ | ||
829 | core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers); | ||
830 | GNUNET_assert (NULL != core); | ||
518 | } | 831 | } |
519 | 832 | ||
520 | 833 | ||
521 | /** | 834 | /** |
522 | * The main function for the statistics service. | 835 | * The main function for the consensus service. |
523 | * | 836 | * |
524 | * @param argc number of arguments from the command line | 837 | * @param argc number of arguments from the command line |
525 | * @param argv command line arguments | 838 | * @param argv command line arguments |
@@ -528,7 +841,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
528 | int | 841 | int |
529 | main (int argc, char *const *argv) | 842 | main (int argc, char *const *argv) |
530 | { | 843 | { |
531 | return (GNUNET_OK == | 844 | int ret; |
532 | GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1; | 845 | ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); |
846 | return (GNUNET_OK == ret) ? 0 : 1; | ||
533 | } | 847 | } |
534 | 848 | ||
diff --git a/src/consensus/test_consensus_api.c b/src/consensus/test_consensus_api.c index 8a263171f..e752983c2 100644 --- a/src/consensus/test_consensus_api.c +++ b/src/consensus/test_consensus_api.c | |||
@@ -27,14 +27,9 @@ | |||
27 | #include "gnunet_testing_lib.h" | 27 | #include "gnunet_testing_lib.h" |
28 | 28 | ||
29 | 29 | ||
30 | static struct GNUNET_CONSENSUS_Handle *consensus1; | 30 | static struct GNUNET_CONSENSUS_Handle *consensus; |
31 | static struct GNUNET_CONSENSUS_Handle *consensus2; | ||
32 | 31 | ||
33 | static int concluded1; | 32 | static int insert; |
34 | static int concluded2; | ||
35 | |||
36 | static int insert1; | ||
37 | static int insert2; | ||
38 | 33 | ||
39 | static struct GNUNET_HashCode session_id; | 34 | static struct GNUNET_HashCode session_id; |
40 | 35 | ||
@@ -48,20 +43,12 @@ static void conclude_done (void *cls, | |||
48 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n"); | 43 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n"); |
49 | } | 44 | } |
50 | 45 | ||
51 | static void | 46 | static int |
52 | on_new_element (void *cls, | 47 | on_new_element (void *cls, |
53 | struct GNUNET_CONSENSUS_Element *element) | 48 | struct GNUNET_CONSENSUS_Element *element) |
54 | { | 49 | { |
55 | struct GNUNET_CONSENSUS_Handle *consensus; | 50 | GNUNET_assert (0); |
56 | 51 | return GNUNET_YES; | |
57 | GNUNET_assert (NULL != element); | ||
58 | |||
59 | consensus = *(struct GNUNET_CONSENSUS_Handle **) cls; | ||
60 | |||
61 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); | ||
62 | |||
63 | GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, &conclude_done, consensus); | ||
64 | |||
65 | } | 52 | } |
66 | 53 | ||
67 | static void | 54 | static void |
@@ -71,7 +58,6 @@ insert_done (void *cls, int success) | |||
71 | } | 58 | } |
72 | 59 | ||
73 | 60 | ||
74 | |||
75 | static void | 61 | static void |
76 | run (void *cls, | 62 | run (void *cls, |
77 | const struct GNUNET_CONFIGURATION_Handle *cfg, | 63 | const struct GNUNET_CONFIGURATION_Handle *cfg, |
@@ -89,11 +75,9 @@ run (void *cls, | |||
89 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n"); | 75 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n"); |
90 | 76 | ||
91 | GNUNET_CRYPTO_hash (str, strlen (str), &session_id); | 77 | GNUNET_CRYPTO_hash (str, strlen (str), &session_id); |
92 | consensus1 = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus1); | 78 | consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus); |
79 | GNUNET_assert (consensus != NULL); | ||
93 | /* | 80 | /* |
94 | consensus2 = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus2); | ||
95 | GNUNET_assert (consensus1 != NULL); | ||
96 | GNUNET_assert (consensus2 != NULL); | ||
97 | GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1); | 81 | GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1); |
98 | GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2); | 82 | GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2); |
99 | */ | 83 | */ |
diff --git a/src/include/gnunet_applications.h b/src/include/gnunet_applications.h index 6702feb5f..5710a8838 100644 --- a/src/include/gnunet_applications.h +++ b/src/include/gnunet_applications.h | |||
@@ -71,6 +71,11 @@ extern "C" | |||
71 | */ | 71 | */ |
72 | #define GNUNET_APPLICATION_TYPE_EXIT_REGEX_PREFIX "GNUNET-VPN-VER-0001-" | 72 | #define GNUNET_APPLICATION_TYPE_EXIT_REGEX_PREFIX "GNUNET-VPN-VER-0001-" |
73 | 73 | ||
74 | /** | ||
75 | * Consensus. | ||
76 | */ | ||
77 | #define GNUNET_APPLICATION_TYPE_CONSENSUS 18 | ||
78 | |||
74 | 79 | ||
75 | #if 0 /* keep Emacsens' auto-indent happy */ | 80 | #if 0 /* keep Emacsens' auto-indent happy */ |
76 | { | 81 | { |
diff --git a/src/include/gnunet_consensus_service.h b/src/include/gnunet_consensus_service.h index 232033598..951f9a031 100644 --- a/src/include/gnunet_consensus_service.h +++ b/src/include/gnunet_consensus_service.h | |||
@@ -76,8 +76,8 @@ struct GNUNET_CONSENSUS_Element | |||
76 | * @return GNUNET_OK if the valid is well-formed and should be added to the consensus, | 76 | * @return GNUNET_OK if the valid is well-formed and should be added to the consensus, |
77 | * GNUNET_SYSERR if the element should be ignored and not be propagated | 77 | * GNUNET_SYSERR if the element should be ignored and not be propagated |
78 | */ | 78 | */ |
79 | typedef int (*GNUNET_CONSENSUS_NewElementCallback) (void *cls, | 79 | typedef int (*GNUNET_CONSENSUS_ElementCallback) (void *cls, |
80 | struct GNUNET_CONSENSUS_Element *element); | 80 | struct GNUNET_CONSENSUS_Element *element); |
81 | 81 | ||
82 | 82 | ||
83 | 83 | ||
@@ -105,10 +105,10 @@ struct GNUNET_CONSENSUS_Handle; | |||
105 | */ | 105 | */ |
106 | struct GNUNET_CONSENSUS_Handle * | 106 | struct GNUNET_CONSENSUS_Handle * |
107 | GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | 107 | GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, |
108 | unsigned int num_peers, | 108 | unsigned int num_peers, |
109 | const struct GNUNET_PeerIdentity *peers, | 109 | const struct GNUNET_PeerIdentity *peers, |
110 | const struct GNUNET_HashCode *session_id, | 110 | const struct GNUNET_HashCode *session_id, |
111 | GNUNET_CONSENSUS_NewElementCallback new_element_cb, | 111 | GNUNET_CONSENSUS_ElementCallback new_element_cb, |
112 | void *new_element_cls); | 112 | void *new_element_cls); |
113 | 113 | ||
114 | 114 | ||
@@ -122,7 +122,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
122 | * the insertion and thus the consensus failed for good | 122 | * the insertion and thus the consensus failed for good |
123 | */ | 123 | */ |
124 | typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls, | 124 | typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls, |
125 | int success); | 125 | int success); |
126 | 126 | ||
127 | 127 | ||
128 | /** | 128 | /** |
@@ -138,9 +138,9 @@ typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls, | |||
138 | */ | 138 | */ |
139 | void | 139 | void |
140 | GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | 140 | GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, |
141 | const struct GNUNET_CONSENSUS_Element *element, | 141 | const struct GNUNET_CONSENSUS_Element *element, |
142 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 142 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
143 | void *idc_cls); | 143 | void *idc_cls); |
144 | 144 | ||
145 | 145 | ||
146 | /** | 146 | /** |
@@ -168,9 +168,9 @@ struct GNUNET_CONSENSUS_DeltaRequest; | |||
168 | */ | 168 | */ |
169 | struct GNUNET_CONSENSUS_DeltaRequest * | 169 | struct GNUNET_CONSENSUS_DeltaRequest * |
170 | GNUNET_CONSENSUS_get_delta (struct GNUNET_CONSENSUS_Handle *consensus, | 170 | GNUNET_CONSENSUS_get_delta (struct GNUNET_CONSENSUS_Handle *consensus, |
171 | uint32_t group_id, | 171 | uint32_t group_id, |
172 | GNUNET_CONSENSUS_NewElementCallback remove_element_cb, | 172 | GNUNET_CONSENSUS_ElementCallback remove_element_cb, |
173 | void *remove_element_cb_cls); | 173 | void *remove_element_cb_cls); |
174 | 174 | ||
175 | 175 | ||
176 | void | 176 | void |
@@ -184,19 +184,7 @@ struct GNUNET_CONSENSUS_Group | |||
184 | uint64_t total_elements_in_group; | 184 | uint64_t total_elements_in_group; |
185 | const struct GNUNET_PeerIdentity **members; | 185 | const struct GNUNET_PeerIdentity **members; |
186 | }; | 186 | }; |
187 | 187 | ||
188 | |||
189 | /** | ||
190 | * Called when a conclusion was successful. | ||
191 | * | ||
192 | * @param cls | ||
193 | * @param num_peers_in_consensus | ||
194 | * @param peers_in_consensus | ||
195 | */ | ||
196 | typedef void (*GNUNET_CONSENSUS_NewConcludeCallback) (void *cls, | ||
197 | unsigned int consensus_group_count, | ||
198 | const struct GNUNET_CONSENSUS_Group *groups); | ||
199 | |||
200 | 188 | ||
201 | /** | 189 | /** |
202 | * Called when a conclusion was successful. | 190 | * Called when a conclusion was successful. |
@@ -206,8 +194,8 @@ typedef void (*GNUNET_CONSENSUS_NewConcludeCallback) (void *cls, | |||
206 | * @param peers_in_consensus | 194 | * @param peers_in_consensus |
207 | */ | 195 | */ |
208 | typedef void (*GNUNET_CONSENSUS_ConcludeCallback) (void *cls, | 196 | typedef void (*GNUNET_CONSENSUS_ConcludeCallback) (void *cls, |
209 | unsigned int num_peers_in_consensus, | 197 | unsigned int consensus_group_count, |
210 | const struct GNUNET_PeerIdentity *peers_in_consensus); | 198 | const struct GNUNET_CONSENSUS_Group *groups); |
211 | 199 | ||
212 | 200 | ||
213 | /** | 201 | /** |
@@ -222,10 +210,10 @@ typedef void (*GNUNET_CONSENSUS_ConcludeCallback) (void *cls, | |||
222 | */ | 210 | */ |
223 | void | 211 | void |
224 | GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | 212 | GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, |
225 | struct GNUNET_TIME_Relative timeout, | 213 | struct GNUNET_TIME_Relative timeout, |
226 | // unsigned int min_group_size_in_consensus, | 214 | unsigned int min_group_size_in_consensus, |
227 | GNUNET_CONSENSUS_ConcludeCallback conclude, | 215 | GNUNET_CONSENSUS_ConcludeCallback conclude, |
228 | void *conclude_cls); | 216 | void *conclude_cls); |
229 | 217 | ||
230 | 218 | ||
231 | /** | 219 | /** |
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index db9a85577..03d510c20 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -1657,36 +1657,31 @@ extern "C" | |||
1657 | */ | 1657 | */ |
1658 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE 525 | 1658 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE 525 |
1659 | 1659 | ||
1660 | /** | ||
1661 | * Sent by client to service, telling whether a received element should | ||
1662 | * be accepted and propagated further or not. | ||
1663 | */ | ||
1664 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK 527 | ||
1665 | 1660 | ||
1666 | /** | 1661 | /* message types 526-539 reserved for consensus client/service messages */ |
1667 | * Update another peer's consensus set with new elements. | 1662 | |
1668 | */ | 1663 | |
1669 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS 528 | ||
1670 | 1664 | ||
1671 | /** | 1665 | /** |
1672 | * Request elements (by their hash) from another peer. | 1666 | * Sent by client to service, telling whether a received element should |
1667 | * be accepted and propagated further or not. | ||
1673 | */ | 1668 | */ |
1674 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_REQUEST_ELEMENTS 529 | 1669 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK 540 |
1675 | 1670 | ||
1676 | /** | 1671 | /** |
1677 | * Strata estimator. | 1672 | * Strata estimator. |
1678 | */ | 1673 | */ |
1679 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_STRATA 530 | 1674 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE 541 |
1680 | 1675 | ||
1681 | /** | 1676 | /** |
1682 | * IBF containing all elements of a peer. | 1677 | * IBF containing all elements of a peer. |
1683 | */ | 1678 | */ |
1684 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_IBF 531 | 1679 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST 542 |
1685 | 1680 | ||
1686 | /** | 1681 | /** |
1687 | * Request reconcilliation with another peer. | 1682 | * Elements, and requests for further elements |
1688 | */ | 1683 | */ |
1689 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_RECONCILE 532 | 1684 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS 543 |
1690 | 1685 | ||
1691 | 1686 | ||
1692 | /** | 1687 | /** |