diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-06-19 10:48:54 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-06-19 10:48:54 +0000 |
commit | a900b29ddaa9ea46c731b054b5e3ef3e725b95a8 (patch) | |
tree | 52e1a9697b0abf4618cd5684359ec5f0a040898a /src/consensus/consensus_api.c | |
parent | 17353bc0a47c89bda205f23e7995377c9bfe7769 (diff) | |
download | gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.tar.gz gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.zip |
- opaque mq structs
- mq for mesh
- faster hashing for IBFs
- mesh replaces stream in set
- new set profiler (work in progress)
Diffstat (limited to 'src/consensus/consensus_api.c')
-rw-r--r-- | src/consensus/consensus_api.c | 319 |
1 files changed, 64 insertions, 255 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index e3ddb4913..684580755 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -33,37 +33,6 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) |
35 | 35 | ||
36 | /** | ||
37 | * Actions that can be queued. | ||
38 | */ | ||
39 | struct QueuedMessage | ||
40 | { | ||
41 | /** | ||
42 | * Queued messages are stored in a doubly linked list. | ||
43 | */ | ||
44 | struct QueuedMessage *next; | ||
45 | |||
46 | /** | ||
47 | * Queued messages are stored in a doubly linked list. | ||
48 | */ | ||
49 | struct QueuedMessage *prev; | ||
50 | |||
51 | /** | ||
52 | * The actual queued message. | ||
53 | */ | ||
54 | struct GNUNET_MessageHeader *msg; | ||
55 | |||
56 | /** | ||
57 | * Will be called after transmit, if not NULL | ||
58 | */ | ||
59 | GNUNET_CONSENSUS_InsertDoneCallback idc; | ||
60 | |||
61 | /** | ||
62 | * Closure for idc | ||
63 | */ | ||
64 | void *idc_cls; | ||
65 | }; | ||
66 | |||
67 | 36 | ||
68 | /** | 37 | /** |
69 | * Handle for the service. | 38 | * Handle for the service. |
@@ -106,21 +75,11 @@ struct GNUNET_CONSENSUS_Handle | |||
106 | struct GNUNET_PeerIdentity **peers; | 75 | struct GNUNET_PeerIdentity **peers; |
107 | 76 | ||
108 | /** | 77 | /** |
109 | * Currently active transmit request. | ||
110 | */ | ||
111 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
112 | |||
113 | /** | ||
114 | * GNUNES_YES iff the join message has been sent to the service. | 78 | * GNUNES_YES iff the join message has been sent to the service. |
115 | */ | 79 | */ |
116 | int joined; | 80 | int joined; |
117 | 81 | ||
118 | /** | 82 | /** |
119 | * Closure for the insert done callback. | ||
120 | */ | ||
121 | void *idc_cls; | ||
122 | |||
123 | /** | ||
124 | * Called when the conclude operation finishes or fails. | 83 | * Called when the conclude operation finishes or fails. |
125 | */ | 84 | */ |
126 | GNUNET_CONSENSUS_ConcludeCallback conclude_cb; | 85 | GNUNET_CONSENSUS_ConcludeCallback conclude_cb; |
@@ -135,109 +94,36 @@ struct GNUNET_CONSENSUS_Handle | |||
135 | */ | 94 | */ |
136 | struct GNUNET_TIME_Absolute conclude_deadline; | 95 | struct GNUNET_TIME_Absolute conclude_deadline; |
137 | 96 | ||
138 | unsigned int conclude_min_size; | 97 | /** |
139 | 98 | * Message queue for the client. | |
140 | struct QueuedMessage *messages_head; | 99 | */ |
141 | 100 | struct GNUNET_MQ_Handle *mq; | |
142 | struct QueuedMessage *messages_tail; | ||
143 | |||
144 | }; | 101 | }; |
145 | 102 | ||
146 | |||
147 | |||
148 | /** | 103 | /** |
149 | * Schedule transmitting the next message. | 104 | * FIXME: this should not bee necessary when the API |
150 | * | 105 | * issue has been fixed |
151 | * @param consensus consensus handle | ||
152 | */ | 106 | */ |
153 | static void | 107 | struct InsertDoneInfo |
154 | send_next (struct GNUNET_CONSENSUS_Handle *consensus); | ||
155 | |||
156 | |||
157 | /** | ||
158 | * Function called to notify a client about the connection | ||
159 | * begin ready to queue more data. "buf" will be | ||
160 | * NULL and "size" zero if the connection was closed for | ||
161 | * writing in the meantime. | ||
162 | * | ||
163 | * @param cls closure | ||
164 | * @param size number of bytes available in buf | ||
165 | * @param buf where the callee should write the message | ||
166 | * @return number of bytes written to buf | ||
167 | */ | ||
168 | static size_t | ||
169 | transmit_queued (void *cls, size_t size, | ||
170 | void *buf) | ||
171 | { | 108 | { |
172 | struct GNUNET_CONSENSUS_Handle *consensus; | 109 | GNUNET_CONSENSUS_InsertDoneCallback idc; |
173 | struct QueuedMessage *qmsg; | 110 | void *cls; |
174 | size_t msg_size; | 111 | }; |
175 | |||
176 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | ||
177 | consensus->th = NULL; | ||
178 | |||
179 | qmsg = consensus->messages_head; | ||
180 | GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); | ||
181 | |||
182 | if (NULL == buf) | ||
183 | { | ||
184 | if (NULL != qmsg->idc) | ||
185 | { | ||
186 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); | ||
187 | } | ||
188 | return 0; | ||
189 | } | ||
190 | |||
191 | msg_size = ntohs (qmsg->msg->size); | ||
192 | |||
193 | GNUNET_assert (size >= msg_size); | ||
194 | |||
195 | memcpy (buf, qmsg->msg, msg_size); | ||
196 | if (NULL != qmsg->idc) | ||
197 | { | ||
198 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); | ||
199 | } | ||
200 | GNUNET_free (qmsg->msg); | ||
201 | GNUNET_free (qmsg); | ||
202 | /* FIXME: free the messages */ | ||
203 | |||
204 | send_next (consensus); | ||
205 | |||
206 | return msg_size; | ||
207 | } | ||
208 | |||
209 | |||
210 | /** | ||
211 | * Schedule transmitting the next message. | ||
212 | * | ||
213 | * @param consensus consensus handle | ||
214 | */ | ||
215 | static void | ||
216 | send_next (struct GNUNET_CONSENSUS_Handle *consensus) | ||
217 | { | ||
218 | if (NULL != consensus->th) | ||
219 | return; | ||
220 | |||
221 | if (NULL != consensus->messages_head) | ||
222 | { | ||
223 | consensus->th = | ||
224 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), | ||
225 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
226 | GNUNET_NO, &transmit_queued, consensus); | ||
227 | } | ||
228 | } | ||
229 | 112 | ||
230 | 113 | ||
231 | /** | 114 | /** |
232 | * Called when the server has sent is a new element | 115 | * Called when the server has sent is a new element |
233 | * | 116 | * |
234 | * @param consensus consensus handle | 117 | * @param cls consensus handle |
235 | * @param msg element message | 118 | * @param mh element message |
236 | */ | 119 | */ |
237 | static void | 120 | static void |
238 | handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | 121 | handle_new_element (void *cls, |
239 | struct GNUNET_CONSENSUS_ElementMessage *msg) | 122 | const struct GNUNET_MessageHeader *mh) |
240 | { | 123 | { |
124 | struct GNUNET_CONSENSUS_Handle *consensus = cls; | ||
125 | const struct GNUNET_CONSENSUS_ElementMessage *msg | ||
126 | = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; | ||
241 | struct GNUNET_SET_Element element; | 127 | struct GNUNET_SET_Element element; |
242 | 128 | ||
243 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); | 129 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); |
@@ -247,8 +133,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | |||
247 | element.data = &msg[1]; | 133 | element.data = &msg[1]; |
248 | 134 | ||
249 | consensus->new_element_cb (consensus->new_element_cls, &element); | 135 | consensus->new_element_cb (consensus->new_element_cls, &element); |
250 | |||
251 | send_next (consensus); | ||
252 | } | 136 | } |
253 | 137 | ||
254 | 138 | ||
@@ -256,13 +140,15 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | |||
256 | * Called when the server has announced | 140 | * Called when the server has announced |
257 | * that the conclusion is over. | 141 | * that the conclusion is over. |
258 | * | 142 | * |
259 | * @param consensus consensus handle | 143 | * @param cls consensus handle |
260 | * @param msg conclude done message | 144 | * @param mh conclude done message |
261 | */ | 145 | */ |
262 | static void | 146 | static void |
263 | handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, | 147 | handle_conclude_done (void *cls, |
264 | const struct GNUNET_MessageHeader *msg) | 148 | const struct GNUNET_MessageHeader *msg) |
265 | { | 149 | { |
150 | struct GNUNET_CONSENSUS_Handle *consensus = cls; | ||
151 | |||
266 | GNUNET_CONSENSUS_ConcludeCallback cc; | 152 | GNUNET_CONSENSUS_ConcludeCallback cc; |
267 | 153 | ||
268 | GNUNET_assert (NULL != (cc = consensus->conclude_cb)); | 154 | GNUNET_assert (NULL != (cc = consensus->conclude_cb)); |
@@ -272,89 +158,6 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, | |||
272 | 158 | ||
273 | 159 | ||
274 | /** | 160 | /** |
275 | * Type of a function to call when we receive a message | ||
276 | * from the service. | ||
277 | * | ||
278 | * @param cls closure | ||
279 | * @param msg message received, NULL on timeout or fatal error | ||
280 | */ | ||
281 | static void | ||
282 | message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | ||
283 | { | ||
284 | struct GNUNET_CONSENSUS_Handle *consensus = cls; | ||
285 | |||
286 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n"); | ||
287 | |||
288 | if (NULL == msg) | ||
289 | { | ||
290 | /* Error, timeout, death */ | ||
291 | LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n"); | ||
292 | GNUNET_CLIENT_disconnect (consensus->client); | ||
293 | consensus->client = NULL; | ||
294 | consensus->new_element_cb (consensus->new_element_cls, NULL); | ||
295 | return; | ||
296 | } | ||
297 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | ||
298 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
299 | switch (ntohs (msg->type)) | ||
300 | { | ||
301 | case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: | ||
302 | handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); | ||
303 | break; | ||
304 | case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: | ||
305 | handle_conclude_done (consensus, msg); | ||
306 | break; | ||
307 | default: | ||
308 | GNUNET_break (0); | ||
309 | } | ||
310 | } | ||
311 | |||
312 | /** | ||
313 | * Function called to notify a client about the connection | ||
314 | * begin ready to queue more data. "buf" will be | ||
315 | * NULL and "size" zero if the connection was closed for | ||
316 | * writing in the meantime. | ||
317 | * | ||
318 | * @param cls closure | ||
319 | * @param size number of bytes available in buf | ||
320 | * @param buf where the callee should write the message | ||
321 | * @return number of bytes written to buf | ||
322 | */ | ||
323 | static size_t | ||
324 | transmit_join (void *cls, size_t size, void *buf) | ||
325 | { | ||
326 | struct GNUNET_CONSENSUS_JoinMessage *msg; | ||
327 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
328 | int msize; | ||
329 | |||
330 | GNUNET_assert (NULL != buf); | ||
331 | |||
332 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); | ||
333 | |||
334 | consensus = cls; | ||
335 | consensus->th = NULL; | ||
336 | consensus->joined = 1; | ||
337 | |||
338 | msg = buf; | ||
339 | |||
340 | msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + | ||
341 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); | ||
342 | |||
343 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); | ||
344 | msg->header.size = htons (msize); | ||
345 | msg->session_id = consensus->session_id; | ||
346 | msg->num_peers = htonl (consensus->num_peers); | ||
347 | memcpy(&msg[1], | ||
348 | consensus->peers, | ||
349 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
350 | send_next (consensus); | ||
351 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | ||
352 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
353 | |||
354 | return msize; | ||
355 | } | ||
356 | |||
357 | /** | ||
358 | * Create a consensus session. | 161 | * Create a consensus session. |
359 | * | 162 | * |
360 | * @param cfg configuration to use for connecting to the consensus service | 163 | * @param cfg configuration to use for connecting to the consensus service |
@@ -377,7 +180,15 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
377 | void *new_element_cls) | 180 | void *new_element_cls) |
378 | { | 181 | { |
379 | struct GNUNET_CONSENSUS_Handle *consensus; | 182 | struct GNUNET_CONSENSUS_Handle *consensus; |
380 | size_t join_message_size; | 183 | struct GNUNET_CONSENSUS_JoinMessage *join_msg; |
184 | struct GNUNET_MQ_Envelope *ev; | ||
185 | const static struct GNUNET_MQ_MessageHandler mq_handlers[] = { | ||
186 | {handle_new_element, | ||
187 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0}, | ||
188 | {handle_conclude_done, | ||
189 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0}, | ||
190 | GNUNET_MQ_HANDLERS_END | ||
191 | }; | ||
381 | 192 | ||
382 | consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); | 193 | consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); |
383 | consensus->cfg = cfg; | 194 | consensus->cfg = cfg; |
@@ -393,24 +204,33 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
393 | GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); | 204 | GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); |
394 | 205 | ||
395 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); | 206 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); |
207 | consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, | ||
208 | mq_handlers, consensus); | ||
396 | 209 | ||
397 | GNUNET_assert (consensus->client != NULL); | 210 | GNUNET_assert (consensus->client != NULL); |
398 | 211 | ||
399 | join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + | 212 | ev = GNUNET_MQ_msg_extra (join_msg, |
400 | (num_peers * sizeof (struct GNUNET_PeerIdentity)); | 213 | (num_peers * sizeof (struct GNUNET_PeerIdentity)), |
401 | 214 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); | |
402 | consensus->th = | ||
403 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
404 | join_message_size, | ||
405 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
406 | GNUNET_NO, &transmit_join, consensus); | ||
407 | 215 | ||
216 | join_msg->session_id = consensus->session_id; | ||
217 | join_msg->num_peers = htonl (consensus->num_peers); | ||
218 | memcpy(&join_msg[1], | ||
219 | consensus->peers, | ||
220 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
408 | 221 | ||
409 | GNUNET_assert (consensus->th != NULL); | 222 | GNUNET_MQ_send (consensus->mq, ev); |
410 | return consensus; | 223 | return consensus; |
411 | } | 224 | } |
412 | 225 | ||
413 | 226 | ||
227 | static void | ||
228 | idc_adapter (void *cls) | ||
229 | { | ||
230 | struct InsertDoneInfo *i = cls; | ||
231 | i->idc (i->cls, GNUNET_OK); | ||
232 | GNUNET_free (i); | ||
233 | } | ||
414 | 234 | ||
415 | /** | 235 | /** |
416 | * Insert an element in the set being reconsiled. Must not be called after | 236 | * Insert an element in the set being reconsiled. Must not be called after |
@@ -428,28 +248,24 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
428 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 248 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
429 | void *idc_cls) | 249 | void *idc_cls) |
430 | { | 250 | { |
431 | struct QueuedMessage *qmsg; | ||
432 | struct GNUNET_CONSENSUS_ElementMessage *element_msg; | 251 | struct GNUNET_CONSENSUS_ElementMessage *element_msg; |
433 | size_t element_msg_size; | 252 | struct GNUNET_MQ_Envelope *ev; |
253 | struct InsertDoneInfo *i; | ||
434 | 254 | ||
435 | LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); | 255 | LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); |
436 | 256 | ||
437 | element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + | 257 | ev = GNUNET_MQ_msg_extra (element_msg, element->size, |
438 | element->size); | 258 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); |
439 | 259 | ||
440 | element_msg = GNUNET_malloc (element_msg_size); | ||
441 | element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); | ||
442 | element_msg->header.size = htons (element_msg_size); | ||
443 | memcpy (&element_msg[1], element->data, element->size); | 260 | memcpy (&element_msg[1], element->data, element->size); |
444 | 261 | ||
445 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | 262 | if (NULL != idc) |
446 | qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; | 263 | { |
447 | qmsg->idc = idc; | 264 | i = GNUNET_new (struct InsertDoneInfo); |
448 | qmsg->idc_cls = idc_cls; | 265 | i->idc = idc; |
449 | 266 | i->cls = idc_cls; | |
450 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); | 267 | GNUNET_MQ_notify_sent (ev, idc_adapter, i); |
451 | 268 | } | |
452 | send_next (consensus); | ||
453 | } | 269 | } |
454 | 270 | ||
455 | 271 | ||
@@ -471,7 +287,7 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | |||
471 | GNUNET_CONSENSUS_ConcludeCallback conclude, | 287 | GNUNET_CONSENSUS_ConcludeCallback conclude, |
472 | void *conclude_cls) | 288 | void *conclude_cls) |
473 | { | 289 | { |
474 | struct QueuedMessage *qmsg; | 290 | struct GNUNET_MQ_Envelope *ev; |
475 | struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; | 291 | struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; |
476 | 292 | ||
477 | GNUNET_assert (NULL != conclude); | 293 | GNUNET_assert (NULL != conclude); |
@@ -480,17 +296,10 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | |||
480 | consensus->conclude_cls = conclude_cls; | 296 | consensus->conclude_cls = conclude_cls; |
481 | consensus->conclude_cb = conclude; | 297 | consensus->conclude_cb = conclude; |
482 | 298 | ||
483 | conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | 299 | ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); |
484 | conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); | ||
485 | conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | ||
486 | conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); | 300 | conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); |
487 | 301 | ||
488 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | 302 | GNUNET_MQ_send (consensus->mq, ev); |
489 | qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; | ||
490 | |||
491 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); | ||
492 | |||
493 | send_next (consensus); | ||
494 | } | 303 | } |
495 | 304 | ||
496 | 305 | ||