aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/consensus_api.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-19 10:48:54 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-19 10:48:54 +0000
commita900b29ddaa9ea46c731b054b5e3ef3e725b95a8 (patch)
tree52e1a9697b0abf4618cd5684359ec5f0a040898a /src/consensus/consensus_api.c
parent17353bc0a47c89bda205f23e7995377c9bfe7769 (diff)
downloadgnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.tar.gz
gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.zip
- opaque mq structs
- mq for mesh - faster hashing for IBFs - mesh replaces stream in set - new set profiler (work in progress)
Diffstat (limited to 'src/consensus/consensus_api.c')
-rw-r--r--src/consensus/consensus_api.c319
1 files changed, 64 insertions, 255 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index e3ddb4913..684580755 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -33,37 +33,6 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35 35
36/**
37 * Actions that can be queued.
38 */
39struct QueuedMessage
40{
41 /**
42 * Queued messages are stored in a doubly linked list.
43 */
44 struct QueuedMessage *next;
45
46 /**
47 * Queued messages are stored in a doubly linked list.
48 */
49 struct QueuedMessage *prev;
50
51 /**
52 * The actual queued message.
53 */
54 struct GNUNET_MessageHeader *msg;
55
56 /**
57 * Will be called after transmit, if not NULL
58 */
59 GNUNET_CONSENSUS_InsertDoneCallback idc;
60
61 /**
62 * Closure for idc
63 */
64 void *idc_cls;
65};
66
67 36
68/** 37/**
69 * Handle for the service. 38 * Handle for the service.
@@ -106,21 +75,11 @@ struct GNUNET_CONSENSUS_Handle
106 struct GNUNET_PeerIdentity **peers; 75 struct GNUNET_PeerIdentity **peers;
107 76
108 /** 77 /**
109 * Currently active transmit request.
110 */
111 struct GNUNET_CLIENT_TransmitHandle *th;
112
113 /**
114 * GNUNES_YES iff the join message has been sent to the service. 78 * GNUNES_YES iff the join message has been sent to the service.
115 */ 79 */
116 int joined; 80 int joined;
117 81
118 /** 82 /**
119 * Closure for the insert done callback.
120 */
121 void *idc_cls;
122
123 /**
124 * Called when the conclude operation finishes or fails. 83 * Called when the conclude operation finishes or fails.
125 */ 84 */
126 GNUNET_CONSENSUS_ConcludeCallback conclude_cb; 85 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
@@ -135,109 +94,36 @@ struct GNUNET_CONSENSUS_Handle
135 */ 94 */
136 struct GNUNET_TIME_Absolute conclude_deadline; 95 struct GNUNET_TIME_Absolute conclude_deadline;
137 96
138 unsigned int conclude_min_size; 97 /**
139 98 * Message queue for the client.
140 struct QueuedMessage *messages_head; 99 */
141 100 struct GNUNET_MQ_Handle *mq;
142 struct QueuedMessage *messages_tail;
143
144}; 101};
145 102
146
147
148/** 103/**
149 * Schedule transmitting the next message. 104 * FIXME: this should not bee necessary when the API
150 * 105 * issue has been fixed
151 * @param consensus consensus handle
152 */ 106 */
153static void 107struct InsertDoneInfo
154send_next (struct GNUNET_CONSENSUS_Handle *consensus);
155
156
157/**
158 * Function called to notify a client about the connection
159 * begin ready to queue more data. "buf" will be
160 * NULL and "size" zero if the connection was closed for
161 * writing in the meantime.
162 *
163 * @param cls closure
164 * @param size number of bytes available in buf
165 * @param buf where the callee should write the message
166 * @return number of bytes written to buf
167 */
168static size_t
169transmit_queued (void *cls, size_t size,
170 void *buf)
171{ 108{
172 struct GNUNET_CONSENSUS_Handle *consensus; 109 GNUNET_CONSENSUS_InsertDoneCallback idc;
173 struct QueuedMessage *qmsg; 110 void *cls;
174 size_t msg_size; 111};
175
176 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
177 consensus->th = NULL;
178
179 qmsg = consensus->messages_head;
180 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
181
182 if (NULL == buf)
183 {
184 if (NULL != qmsg->idc)
185 {
186 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
187 }
188 return 0;
189 }
190
191 msg_size = ntohs (qmsg->msg->size);
192
193 GNUNET_assert (size >= msg_size);
194
195 memcpy (buf, qmsg->msg, msg_size);
196 if (NULL != qmsg->idc)
197 {
198 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
199 }
200 GNUNET_free (qmsg->msg);
201 GNUNET_free (qmsg);
202 /* FIXME: free the messages */
203
204 send_next (consensus);
205
206 return msg_size;
207}
208
209
210/**
211 * Schedule transmitting the next message.
212 *
213 * @param consensus consensus handle
214 */
215static void
216send_next (struct GNUNET_CONSENSUS_Handle *consensus)
217{
218 if (NULL != consensus->th)
219 return;
220
221 if (NULL != consensus->messages_head)
222 {
223 consensus->th =
224 GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
225 GNUNET_TIME_UNIT_FOREVER_REL,
226 GNUNET_NO, &transmit_queued, consensus);
227 }
228}
229 112
230 113
231/** 114/**
232 * Called when the server has sent is a new element 115 * Called when the server has sent is a new element
233 * 116 *
234 * @param consensus consensus handle 117 * @param cls consensus handle
235 * @param msg element message 118 * @param mh element message
236 */ 119 */
237static void 120static void
238handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, 121handle_new_element (void *cls,
239 struct GNUNET_CONSENSUS_ElementMessage *msg) 122 const struct GNUNET_MessageHeader *mh)
240{ 123{
124 struct GNUNET_CONSENSUS_Handle *consensus = cls;
125 const struct GNUNET_CONSENSUS_ElementMessage *msg
126 = (const struct GNUNET_CONSENSUS_ElementMessage *) mh;
241 struct GNUNET_SET_Element element; 127 struct GNUNET_SET_Element element;
242 128
243 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); 129 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
@@ -247,8 +133,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
247 element.data = &msg[1]; 133 element.data = &msg[1];
248 134
249 consensus->new_element_cb (consensus->new_element_cls, &element); 135 consensus->new_element_cb (consensus->new_element_cls, &element);
250
251 send_next (consensus);
252} 136}
253 137
254 138
@@ -256,13 +140,15 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
256 * Called when the server has announced 140 * Called when the server has announced
257 * that the conclusion is over. 141 * that the conclusion is over.
258 * 142 *
259 * @param consensus consensus handle 143 * @param cls consensus handle
260 * @param msg conclude done message 144 * @param mh conclude done message
261 */ 145 */
262static void 146static void
263handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, 147handle_conclude_done (void *cls,
264 const struct GNUNET_MessageHeader *msg) 148 const struct GNUNET_MessageHeader *msg)
265{ 149{
150 struct GNUNET_CONSENSUS_Handle *consensus = cls;
151
266 GNUNET_CONSENSUS_ConcludeCallback cc; 152 GNUNET_CONSENSUS_ConcludeCallback cc;
267 153
268 GNUNET_assert (NULL != (cc = consensus->conclude_cb)); 154 GNUNET_assert (NULL != (cc = consensus->conclude_cb));
@@ -272,89 +158,6 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
272 158
273 159
274/** 160/**
275 * Type of a function to call when we receive a message
276 * from the service.
277 *
278 * @param cls closure
279 * @param msg message received, NULL on timeout or fatal error
280 */
281static void
282message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
283{
284 struct GNUNET_CONSENSUS_Handle *consensus = cls;
285
286 LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
287
288 if (NULL == msg)
289 {
290 /* Error, timeout, death */
291 LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
292 GNUNET_CLIENT_disconnect (consensus->client);
293 consensus->client = NULL;
294 consensus->new_element_cb (consensus->new_element_cls, NULL);
295 return;
296 }
297 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
298 GNUNET_TIME_UNIT_FOREVER_REL);
299 switch (ntohs (msg->type))
300 {
301 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
302 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
303 break;
304 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
305 handle_conclude_done (consensus, msg);
306 break;
307 default:
308 GNUNET_break (0);
309 }
310}
311
312/**
313 * Function called to notify a client about the connection
314 * begin ready to queue more data. "buf" will be
315 * NULL and "size" zero if the connection was closed for
316 * writing in the meantime.
317 *
318 * @param cls closure
319 * @param size number of bytes available in buf
320 * @param buf where the callee should write the message
321 * @return number of bytes written to buf
322 */
323static size_t
324transmit_join (void *cls, size_t size, void *buf)
325{
326 struct GNUNET_CONSENSUS_JoinMessage *msg;
327 struct GNUNET_CONSENSUS_Handle *consensus;
328 int msize;
329
330 GNUNET_assert (NULL != buf);
331
332 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
333
334 consensus = cls;
335 consensus->th = NULL;
336 consensus->joined = 1;
337
338 msg = buf;
339
340 msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
341 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
342
343 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
344 msg->header.size = htons (msize);
345 msg->session_id = consensus->session_id;
346 msg->num_peers = htonl (consensus->num_peers);
347 memcpy(&msg[1],
348 consensus->peers,
349 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
350 send_next (consensus);
351 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
352 GNUNET_TIME_UNIT_FOREVER_REL);
353
354 return msize;
355}
356
357/**
358 * Create a consensus session. 161 * Create a consensus session.
359 * 162 *
360 * @param cfg configuration to use for connecting to the consensus service 163 * @param cfg configuration to use for connecting to the consensus service
@@ -377,7 +180,15 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
377 void *new_element_cls) 180 void *new_element_cls)
378{ 181{
379 struct GNUNET_CONSENSUS_Handle *consensus; 182 struct GNUNET_CONSENSUS_Handle *consensus;
380 size_t join_message_size; 183 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
184 struct GNUNET_MQ_Envelope *ev;
185 const static struct GNUNET_MQ_MessageHandler mq_handlers[] = {
186 {handle_new_element,
187 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0},
188 {handle_conclude_done,
189 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0},
190 GNUNET_MQ_HANDLERS_END
191 };
381 192
382 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); 193 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
383 consensus->cfg = cfg; 194 consensus->cfg = cfg;
@@ -393,24 +204,33 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
393 GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); 204 GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
394 205
395 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); 206 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
207 consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
208 mq_handlers, consensus);
396 209
397 GNUNET_assert (consensus->client != NULL); 210 GNUNET_assert (consensus->client != NULL);
398 211
399 join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + 212 ev = GNUNET_MQ_msg_extra (join_msg,
400 (num_peers * sizeof (struct GNUNET_PeerIdentity)); 213 (num_peers * sizeof (struct GNUNET_PeerIdentity)),
401 214 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
402 consensus->th =
403 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
404 join_message_size,
405 GNUNET_TIME_UNIT_FOREVER_REL,
406 GNUNET_NO, &transmit_join, consensus);
407 215
216 join_msg->session_id = consensus->session_id;
217 join_msg->num_peers = htonl (consensus->num_peers);
218 memcpy(&join_msg[1],
219 consensus->peers,
220 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
408 221
409 GNUNET_assert (consensus->th != NULL); 222 GNUNET_MQ_send (consensus->mq, ev);
410 return consensus; 223 return consensus;
411} 224}
412 225
413 226
227static void
228idc_adapter (void *cls)
229{
230 struct InsertDoneInfo *i = cls;
231 i->idc (i->cls, GNUNET_OK);
232 GNUNET_free (i);
233}
414 234
415/** 235/**
416 * Insert an element in the set being reconsiled. Must not be called after 236 * Insert an element in the set being reconsiled. Must not be called after
@@ -428,28 +248,24 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
428 GNUNET_CONSENSUS_InsertDoneCallback idc, 248 GNUNET_CONSENSUS_InsertDoneCallback idc,
429 void *idc_cls) 249 void *idc_cls)
430{ 250{
431 struct QueuedMessage *qmsg;
432 struct GNUNET_CONSENSUS_ElementMessage *element_msg; 251 struct GNUNET_CONSENSUS_ElementMessage *element_msg;
433 size_t element_msg_size; 252 struct GNUNET_MQ_Envelope *ev;
253 struct InsertDoneInfo *i;
434 254
435 LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); 255 LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
436 256
437 element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + 257 ev = GNUNET_MQ_msg_extra (element_msg, element->size,
438 element->size); 258 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
439 259
440 element_msg = GNUNET_malloc (element_msg_size);
441 element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
442 element_msg->header.size = htons (element_msg_size);
443 memcpy (&element_msg[1], element->data, element->size); 260 memcpy (&element_msg[1], element->data, element->size);
444 261
445 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); 262 if (NULL != idc)
446 qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; 263 {
447 qmsg->idc = idc; 264 i = GNUNET_new (struct InsertDoneInfo);
448 qmsg->idc_cls = idc_cls; 265 i->idc = idc;
449 266 i->cls = idc_cls;
450 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); 267 GNUNET_MQ_notify_sent (ev, idc_adapter, i);
451 268 }
452 send_next (consensus);
453} 269}
454 270
455 271
@@ -471,7 +287,7 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
471 GNUNET_CONSENSUS_ConcludeCallback conclude, 287 GNUNET_CONSENSUS_ConcludeCallback conclude,
472 void *conclude_cls) 288 void *conclude_cls)
473{ 289{
474 struct QueuedMessage *qmsg; 290 struct GNUNET_MQ_Envelope *ev;
475 struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; 291 struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
476 292
477 GNUNET_assert (NULL != conclude); 293 GNUNET_assert (NULL != conclude);
@@ -480,17 +296,10 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
480 consensus->conclude_cls = conclude_cls; 296 consensus->conclude_cls = conclude_cls;
481 consensus->conclude_cb = conclude; 297 consensus->conclude_cb = conclude;
482 298
483 conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); 299 ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
484 conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
485 conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
486 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); 300 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
487 301
488 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); 302 GNUNET_MQ_send (consensus->mq, ev);
489 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
490
491 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
492
493 send_next (consensus);
494} 303}
495 304
496 305