aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-19 10:48:54 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-19 10:48:54 +0000
commita900b29ddaa9ea46c731b054b5e3ef3e725b95a8 (patch)
tree52e1a9697b0abf4618cd5684359ec5f0a040898a /src/util
parent17353bc0a47c89bda205f23e7995377c9bfe7769 (diff)
downloadgnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.tar.gz
gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.zip
- opaque mq structs
- mq for mesh - faster hashing for IBFs - mesh replaces stream in set - new set profiler (work in progress)
Diffstat (limited to 'src/util')
-rw-r--r--src/util/mq.c464
-rw-r--r--src/util/test_mq.c4
-rw-r--r--src/util/test_mq_client.c9
3 files changed, 352 insertions, 125 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
index dc87b9711..d0253c40f 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -31,6 +31,118 @@
31#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) 31#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
32 32
33 33
34struct GNUNET_MQ_Envelope
35{
36 /**
37 * Messages are stored in a linked list.
38 * Each queue has its own list of envelopes.
39 */
40 struct GNUNET_MQ_Envelope *next;
41
42 /**
43 * Messages are stored in a linked list
44 * Each queue has its own list of envelopes.
45 */
46 struct GNUNET_MQ_Envelope *prev;
47
48 /**
49 * Actual allocated message header,
50 * usually points to the end of the containing GNUNET_MQ_Envelope
51 */
52 struct GNUNET_MessageHeader *mh;
53
54 /**
55 * Queue the message is queued in, NULL if message is not queued.
56 */
57 struct GNUNET_MQ_Handle *parent_queue;
58
59 /**
60 * Called after the message was sent irrevocably.
61 */
62 GNUNET_MQ_NotifyCallback sent_cb;
63
64 /**
65 * Closure for send_cb
66 */
67 void *sent_cls;
68};
69
70
71/**
72 * Handle to a message queue.
73 */
74struct GNUNET_MQ_Handle
75{
76 /**
77 * Handlers array, or NULL if the queue should not receive messages
78 */
79 const struct GNUNET_MQ_MessageHandler *handlers;
80
81 /**
82 * Closure for the handler callbacks,
83 * as well as for the error handler.
84 */
85 void *handlers_cls;
86
87 /**
88 * Actual implementation of message sending,
89 * called when a message is added
90 */
91 GNUNET_MQ_SendImpl send_impl;
92
93 /**
94 * Implementation-dependent queue destruction function
95 */
96 GNUNET_MQ_DestroyImpl destroy_impl;
97
98 /**
99 * Implementation-specific state
100 */
101 void *impl_state;
102
103 /**
104 * Callback will be called when an error occurs.
105 */
106 GNUNET_MQ_ErrorHandler error_handler;
107
108 /**
109 * Linked list of messages pending to be sent
110 */
111 struct GNUNET_MQ_Envelope *envelope_head;
112
113 /**
114 * Linked list of messages pending to be sent
115 */
116 struct GNUNET_MQ_Envelope *envelope_tail;
117
118 /**
119 * Message that is currently scheduled to be
120 * sent. Not the head of the message queue, as the implementation
121 * needs to know if sending has been already scheduled or not.
122 */
123 struct GNUNET_MQ_Envelope *current_envelope;
124
125 /**
126 * Has the current envelope been commited?
127 * Either GNUNET_YES or GNUNET_NO.
128 */
129 int commited;
130
131 /**
132 * Map of associations, lazily allocated
133 */
134 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
135
136 /**
137 * Next id that should be used for the assoc_map,
138 * initialized lazily to a random value together with
139 * assoc_map
140 */
141 uint32_t assoc_id;
142};
143
144
145
34 146
35struct ServerClientSocketState 147struct ServerClientSocketState
36{ 148{
@@ -42,9 +154,14 @@ struct ServerClientSocketState
42struct ClientConnectionState 154struct ClientConnectionState
43{ 155{
44 /** 156 /**
45 * Did we call receive? 157 * Did we call receive alread alreadyy?
46 */ 158 */
47 int receive_active; 159 int receive_active;
160
161 /**
162 * Do we also want to receive?
163 */
164 int receive_requested;
48 struct GNUNET_CLIENT_Connection *connection; 165 struct GNUNET_CLIENT_Connection *connection;
49 struct GNUNET_CLIENT_TransmitHandle *th; 166 struct GNUNET_CLIENT_TransmitHandle *th;
50}; 167};
@@ -59,9 +176,9 @@ struct ClientConnectionState
59 * @param mh message to dispatch 176 * @param mh message to dispatch
60 */ 177 */
61void 178void
62GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) 179GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
63{ 180{
64 const struct GNUNET_MQ_Handler *handler; 181 const struct GNUNET_MQ_MessageHandler *handler;
65 int handled = GNUNET_NO; 182 int handled = GNUNET_NO;
66 183
67 handler = mq->handlers; 184 handler = mq->handlers;
@@ -81,8 +198,27 @@ GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_Messa
81} 198}
82 199
83 200
201/**
202 * Call the right callback for an error condition.
203 *
204 * @param mq message queue
205 */
84void 206void
85GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) 207GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
208 enum GNUNET_MQ_Error error)
209{
210 if (NULL == mq->error_handler)
211 {
212 /* FIXME: log what kind of error occured */
213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
214 return;
215 }
216 mq->error_handler (mq->handlers_cls, error);
217}
218
219
220void
221GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
86{ 222{
87 GNUNET_assert (NULL == mqm->parent_queue); 223 GNUNET_assert (NULL == mqm->parent_queue);
88 GNUNET_free (mqm); 224 GNUNET_free (mqm);
@@ -94,20 +230,156 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
94 * May only be called once per message. 230 * May only be called once per message.
95 * 231 *
96 * @param mq message queue 232 * @param mq message queue
97 * @param mqm the message to send. 233 * @param ev the message to send.
234 */
235void
236GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
237{
238 GNUNET_assert (NULL != mq);
239 GNUNET_assert (NULL == ev->parent_queue);
240
241 /* is the implementation busy? queue it! */
242 if (NULL != mq->current_envelope)
243 {
244 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
245 return;
246 }
247 mq->current_envelope = ev;
248 mq->send_impl (mq, ev->mh, mq->impl_state);
249}
250
251
252/**
253 * Call the send implementation for the next queued message,
254 * if any.
255 * Only useful for implementing message queues,
256 * results in undefined behavior if not used carefully.
257 *
258 * @param mq message queue to send the next message with
98 */ 259 */
99void 260void
100GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 261GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
101{ 262{
263 /* call is only valid if we're actually currently sending
264 * a message */
102 GNUNET_assert (NULL != mq); 265 GNUNET_assert (NULL != mq);
103 mq->send_impl (mq, mqm); 266 GNUNET_assert (NULL != mq->current_envelope);
267 GNUNET_assert (GNUNET_YES == mq->commited);
268 mq->commited = GNUNET_NO;
269 GNUNET_free (mq->current_envelope);
270 if (NULL == mq->envelope_head)
271 {
272 mq->current_envelope = NULL;
273 return;
274 }
275
276
277 GNUNET_assert (NULL != mq->envelope_tail);
278 GNUNET_assert (NULL != mq->envelope_head);
279 mq->current_envelope = mq->envelope_head;
280 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
281 mq->current_envelope);
282 mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
104} 283}
105 284
106 285
107struct GNUNET_MQ_Message * 286/**
287 * Create a message queue for the specified handlers.
288 *
289 * @param send function the implements sending messages
290 * @param destroy function that implements destroying the queue
291 * @param destroy function that implements canceling a message
292 * @param state for the queue, passed to 'send' and 'destroy'
293 * @param handlers array of message handlers
294 * @param error_handler handler for read and write errors
295 * @return a new message queue
296 */
297struct GNUNET_MQ_Handle *
298GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
299 GNUNET_MQ_DestroyImpl destroy,
300 GNUNET_MQ_CancelImpl cancel,
301 void *impl_state,
302 const struct GNUNET_MQ_MessageHandler *handlers,
303 GNUNET_MQ_ErrorHandler error_handler,
304 void *cls)
305{
306 struct GNUNET_MQ_Handle *mq;
307
308 mq = GNUNET_new (struct GNUNET_MQ_Handle);
309 mq->send_impl = send;
310 mq->destroy_impl = destroy;
311 mq->handlers = handlers;
312 mq->handlers_cls = cls;
313 mq->impl_state = impl_state;
314
315 return mq;
316}
317
318
319/**
320 * Get the message that should currently be sent.
321 * Fails if there is no current message.
322 * Only useful for implementing message queues,
323 * results in undefined behavior if not used carefully.
324 *
325 * @param mq message queue with the current message
326 * @return message to send, never NULL
327 */
328const struct GNUNET_MessageHeader *
329GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
330{
331 if (NULL == mq->current_envelope)
332 GNUNET_abort ();
333 if (NULL == mq->current_envelope->mh)
334 GNUNET_abort ();
335 return mq->current_envelope->mh;
336}
337
338
339/**
340 * Get the implementation state associated with the
341 * message queue.
342 *
343 * While the GNUNET_MQ_Impl* callbacks receive the
344 * implementation state, continuations that are scheduled
345 * by the implementation function often only have one closure
346 * argument, with this function it is possible to get at the
347 * implementation state when only passing the GNUNET_MQ_Handle
348 * as closure.
349 *
350 * @param mq message queue with the current message
351 * @return message to send, never NULL
352 */
353void *
354GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
355{
356 return mq->impl_state;
357}
358
359
360
361/**
362 * Mark the current message as irrevocably sent, but do not
363 * proceed with sending the next message.
364 * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
365 *
366 * @param mq message queue
367 */
368void
369GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
370{
371 GNUNET_assert (NULL != mq->current_envelope);
372 GNUNET_assert (GNUNET_NO == mq->commited);
373 mq->commited = GNUNET_YES;
374 if (NULL != mq->current_envelope->sent_cb)
375 mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
376}
377
378
379struct GNUNET_MQ_Envelope *
108GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) 380GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
109{ 381{
110 struct GNUNET_MQ_Message *mqm; 382 struct GNUNET_MQ_Envelope *mqm;
111 383
112 mqm = GNUNET_malloc (sizeof *mqm + size); 384 mqm = GNUNET_malloc (sizeof *mqm + size);
113 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; 385 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
@@ -119,11 +391,11 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
119} 391}
120 392
121 393
122struct GNUNET_MQ_Message * 394struct GNUNET_MQ_Envelope *
123GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, 395GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
124 const struct GNUNET_MessageHeader *nested_mh) 396 const struct GNUNET_MessageHeader *nested_mh)
125{ 397{
126 struct GNUNET_MQ_Message *mqm; 398 struct GNUNET_MQ_Envelope *mqm;
127 uint16_t size; 399 uint16_t size;
128 400
129 if (NULL == nested_mh) 401 if (NULL == nested_mh)
@@ -154,85 +426,62 @@ static size_t
154transmit_queued (void *cls, size_t size, 426transmit_queued (void *cls, size_t size,
155 void *buf) 427 void *buf)
156{ 428{
157 struct GNUNET_MQ_MessageQueue *mq = cls; 429 struct GNUNET_MQ_Handle *mq = cls;
158 struct GNUNET_MQ_Message *mqm = mq->current_msg; 430 struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
159 struct ServerClientSocketState *state = mq->impl_state; 431 const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
160 size_t msg_size; 432 size_t msg_size;
161 433
162 GNUNET_assert (NULL != buf); 434 GNUNET_assert (NULL != buf);
163 435
164 if (NULL != mqm->sent_cb) 436 msg_size = ntohs (msg->size);
165 {
166 mqm->sent_cb (mqm->sent_cls);
167 }
168
169 mq->current_msg = NULL;
170 GNUNET_assert (NULL != mqm);
171 msg_size = ntohs (mqm->mh->size);
172 GNUNET_assert (size >= msg_size); 437 GNUNET_assert (size >= msg_size);
173 memcpy (buf, mqm->mh, msg_size); 438 memcpy (buf, msg, msg_size);
174 GNUNET_free (mqm);
175 state->th = NULL; 439 state->th = NULL;
176 440
177 if (NULL != mq->msg_head) 441 GNUNET_MQ_impl_send_continue (mq);
178 { 442
179 mq->current_msg = mq->msg_head;
180 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
181 state->th =
182 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
183 GNUNET_TIME_UNIT_FOREVER_REL,
184 &transmit_queued, mq);
185 }
186 return msg_size; 443 return msg_size;
187} 444}
188 445
189 446
190 447
191static void 448static void
192server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) 449server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
450 void *impl_state)
193{ 451{
194 struct ServerClientSocketState *state; 452 struct ServerClientSocketState *state = impl_state;
195 453
196 GNUNET_assert (NULL != mq); 454 GNUNET_assert (NULL != mq);
197 state = mq->impl_state;
198 GNUNET_assert (NULL != state); 455 GNUNET_assert (NULL != state);
199 GNUNET_SERVER_client_drop (state->client); 456 GNUNET_SERVER_client_drop (state->client);
200 GNUNET_free (state); 457 GNUNET_free (state);
201} 458}
202 459
203static void 460static void
204server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 461server_client_send_impl (struct GNUNET_MQ_Handle *mq,
462 const struct GNUNET_MessageHeader *msg, void *impl_state)
205{ 463{
206 struct ServerClientSocketState *state; 464 struct ServerClientSocketState *state = impl_state;
207 int msize;
208 465
209 GNUNET_assert (NULL != mq); 466 GNUNET_assert (NULL != mq);
210 state = mq->impl_state;
211 GNUNET_assert (NULL != state); 467 GNUNET_assert (NULL != state);
212 468
213 if (NULL != state->th) 469 GNUNET_MQ_impl_send_commit (mq);
214 { 470
215 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
216 return;
217 }
218 GNUNET_assert (NULL == mq->msg_head);
219 GNUNET_assert (NULL == mq->current_msg);
220 msize = ntohs (mqm->mh->size);
221 mq->current_msg = mqm;
222 state->th = 471 state->th =
223 GNUNET_SERVER_notify_transmit_ready (state->client, msize, 472 GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
224 GNUNET_TIME_UNIT_FOREVER_REL, 473 GNUNET_TIME_UNIT_FOREVER_REL,
225 &transmit_queued, mq); 474 &transmit_queued, mq);
226} 475}
227 476
228 477
229struct GNUNET_MQ_MessageQueue * 478struct GNUNET_MQ_Handle *
230GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) 479GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
231{ 480{
232 struct GNUNET_MQ_MessageQueue *mq; 481 struct GNUNET_MQ_Handle *mq;
233 struct ServerClientSocketState *scss; 482 struct ServerClientSocketState *scss;
234 483
235 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); 484 mq = GNUNET_new (struct GNUNET_MQ_Handle);
236 scss = GNUNET_new (struct ServerClientSocketState); 485 scss = GNUNET_new (struct ServerClientSocketState);
237 mq->impl_state = scss; 486 mq->impl_state = scss;
238 scss->client = client; 487 scss->client = client;
@@ -254,24 +503,21 @@ static void
254handle_client_message (void *cls, 503handle_client_message (void *cls,
255 const struct GNUNET_MessageHeader *msg) 504 const struct GNUNET_MessageHeader *msg)
256{ 505{
257 struct GNUNET_MQ_MessageQueue *mq = cls; 506 struct GNUNET_MQ_Handle *mq = cls;
258 struct ClientConnectionState *state; 507 struct ClientConnectionState *state;
259 508
260 state = mq->impl_state; 509 state = mq->impl_state;
261 510
262 if (NULL == msg) 511 if (NULL == msg)
263 { 512 {
264 if (NULL == mq->error_handler) 513 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
265 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
266 else
267 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
268 return; 514 return;
269 } 515 }
270 516
271 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, 517 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
272 GNUNET_TIME_UNIT_FOREVER_REL); 518 GNUNET_TIME_UNIT_FOREVER_REL);
273 519
274 GNUNET_MQ_dispatch (mq, msg); 520 GNUNET_MQ_inject_message (mq, msg);
275} 521}
276 522
277 523
@@ -287,23 +533,22 @@ static size_t
287connection_client_transmit_queued (void *cls, size_t size, 533connection_client_transmit_queued (void *cls, size_t size,
288 void *buf) 534 void *buf)
289{ 535{
290 struct GNUNET_MQ_MessageQueue *mq = cls; 536 struct GNUNET_MQ_Handle *mq = cls;
291 struct GNUNET_MQ_Message *mqm = mq->current_msg; 537 const struct GNUNET_MessageHeader *msg;
292 struct ClientConnectionState *state = mq->impl_state; 538 struct ClientConnectionState *state = mq->impl_state;
293 size_t msg_size; 539 size_t msg_size;
294 540
541 GNUNET_assert (NULL != mq);
542 msg = GNUNET_MQ_impl_current (mq);
543
295 if (NULL == buf) 544 if (NULL == buf)
296 { 545 {
297 if (NULL == mq->error_handler) 546 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
298 {
299 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n");
300 return 0;
301 }
302 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
303 return 0; 547 return 0;
304 } 548 }
305 549
306 if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active)) 550 if ( (GNUNET_YES == state->receive_requested) &&
551 (GNUNET_NO == state->receive_active) )
307 { 552 {
308 state->receive_active = GNUNET_YES; 553 state->receive_active = GNUNET_YES;
309 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, 554 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
@@ -311,78 +556,53 @@ connection_client_transmit_queued (void *cls, size_t size,
311 } 556 }
312 557
313 558
314 GNUNET_assert (NULL != mqm); 559 msg_size = ntohs (msg->size);
315
316 if (NULL != mqm->sent_cb)
317 {
318 mqm->sent_cb (mqm->sent_cls);
319 }
320
321 mq->current_msg = NULL;
322 GNUNET_assert (NULL != buf);
323 msg_size = ntohs (mqm->mh->size);
324 GNUNET_assert (size >= msg_size); 560 GNUNET_assert (size >= msg_size);
325 memcpy (buf, mqm->mh, msg_size); 561 memcpy (buf, msg, msg_size);
326 GNUNET_free (mqm);
327 state->th = NULL; 562 state->th = NULL;
328 if (NULL != mq->msg_head) 563
329 { 564 GNUNET_MQ_impl_send_continue (mq);
330 mq->current_msg = mq->msg_head; 565
331 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
332 state->th =
333 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
334 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
335 &connection_client_transmit_queued, mq);
336 }
337 return msg_size; 566 return msg_size;
338} 567}
339 568
340 569
341 570
342static void 571static void
343connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) 572connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
344{ 573{
345 GNUNET_free (mq->impl_state); 574 GNUNET_free (impl_state);
346} 575}
347 576
348static void 577static void
349connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, 578connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
350 struct GNUNET_MQ_Message *mqm) 579 const struct GNUNET_MessageHeader *msg, void *impl_state)
351{ 580{
352 struct ClientConnectionState *state = mq->impl_state; 581 struct ClientConnectionState *state = impl_state;
353 int msize;
354 582
355 GNUNET_assert (NULL != state); 583 GNUNET_assert (NULL != state);
584 GNUNET_assert (NULL == state->th);
585
586 GNUNET_MQ_impl_send_commit (mq);
356 587
357 if (NULL != state->th)
358 {
359 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
360 return;
361 }
362 GNUNET_assert (NULL == mq->current_msg);
363 mq->current_msg = mqm;
364 msize = ntohs (mqm->mh->size);
365 state->th = 588 state->th =
366 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 589 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
367 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, 590 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
368 &connection_client_transmit_queued, mq); 591 &connection_client_transmit_queued, mq);
369} 592}
370 593
371 594
372 595struct GNUNET_MQ_Handle *
373
374
375struct GNUNET_MQ_MessageQueue *
376GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, 596GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
377 const struct GNUNET_MQ_Handler *handlers, 597 const struct GNUNET_MQ_MessageHandler *handlers,
378 void *cls) 598 void *cls)
379{ 599{
380 struct GNUNET_MQ_MessageQueue *mq; 600 struct GNUNET_MQ_Handle *mq;
381 struct ClientConnectionState *state; 601 struct ClientConnectionState *state;
382 602
383 GNUNET_assert (NULL != connection); 603 GNUNET_assert (NULL != connection);
384 604
385 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); 605 mq = GNUNET_new (struct GNUNET_MQ_Handle);
386 mq->handlers = handlers; 606 mq->handlers = handlers;
387 mq->handlers_cls = cls; 607 mq->handlers_cls = cls;
388 state = GNUNET_new (struct ClientConnectionState); 608 state = GNUNET_new (struct ClientConnectionState);
@@ -390,16 +610,20 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
390 mq->impl_state = state; 610 mq->impl_state = state;
391 mq->send_impl = connection_client_send_impl; 611 mq->send_impl = connection_client_send_impl;
392 mq->destroy_impl = connection_client_destroy_impl; 612 mq->destroy_impl = connection_client_destroy_impl;
613 if (NULL != handlers)
614 state->receive_requested = GNUNET_YES;
393 615
394 return mq; 616 return mq;
395} 617}
396 618
397 619
398void 620void
399GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, 621GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
400 const struct GNUNET_MQ_Handler *new_handlers, 622 const struct GNUNET_MQ_MessageHandler *new_handlers,
401 void *cls) 623 void *cls)
402{ 624{
625 /* FIXME: notify implementation? */
626 /* FIXME: what about NULL handlers? abort receive? */
403 mq->handlers = new_handlers; 627 mq->handlers = new_handlers;
404 mq->handlers_cls = cls; 628 mq->handlers_cls = cls;
405} 629}
@@ -413,8 +637,7 @@ GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
413 * @param assoc_data to associate 637 * @param assoc_data to associate
414 */ 638 */
415uint32_t 639uint32_t
416GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, 640GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
417 struct GNUNET_MQ_Message *mqm,
418 void *assoc_data) 641 void *assoc_data)
419{ 642{
420 uint32_t id; 643 uint32_t id;
@@ -433,7 +656,7 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
433 656
434 657
435void * 658void *
436GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) 659GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
437{ 660{
438 if (NULL == mq->assoc_map) 661 if (NULL == mq->assoc_map)
439 return NULL; 662 return NULL;
@@ -442,7 +665,7 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
442 665
443 666
444void * 667void *
445GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) 668GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
446{ 669{
447 void *val; 670 void *val;
448 671
@@ -456,7 +679,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
456 679
457 680
458void 681void
459GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, 682GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
460 GNUNET_MQ_NotifyCallback cb, 683 GNUNET_MQ_NotifyCallback cb,
461 void *cls) 684 void *cls)
462{ 685{
@@ -466,13 +689,13 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
466 689
467 690
468void 691void
469GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) 692GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
470{ 693{
471 /* FIXME: destroy all pending messages in the queue */ 694 /* FIXME: destroy all pending messages in the queue */
472 695
473 if (NULL != mq->destroy_impl) 696 if (NULL != mq->destroy_impl)
474 { 697 {
475 mq->destroy_impl (mq); 698 mq->destroy_impl (mq, mq->impl_state);
476 } 699 }
477 700
478 GNUNET_free (mq); 701 GNUNET_free (mq);
@@ -480,7 +703,6 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
480 703
481 704
482 705
483
484struct GNUNET_MessageHeader * 706struct GNUNET_MessageHeader *
485GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) 707GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
486{ 708{
diff --git a/src/util/test_mq.c b/src/util/test_mq.c
index 55cd80ef1..45bba0a6b 100644
--- a/src/util/test_mq.c
+++ b/src/util/test_mq.c
@@ -40,7 +40,7 @@ GNUNET_NETWORK_STRUCT_END
40void 40void
41test1 (void) 41test1 (void)
42{ 42{
43 struct GNUNET_MQ_Message *mqm; 43 struct GNUNET_MQ_Envelope *mqm;
44 struct MyMessage *mm; 44 struct MyMessage *mm;
45 45
46 mm = NULL; 46 mm = NULL;
@@ -57,7 +57,7 @@ test1 (void)
57void 57void
58test2 (void) 58test2 (void)
59{ 59{
60 struct GNUNET_MQ_Message *mqm; 60 struct GNUNET_MQ_Envelope *mqm;
61 struct GNUNET_MessageHeader *mh; 61 struct GNUNET_MessageHeader *mh;
62 62
63 mqm = GNUNET_MQ_msg_header (42); 63 mqm = GNUNET_MQ_msg_header (42);
diff --git a/src/util/test_mq_client.c b/src/util/test_mq_client.c
index b7eb1516a..30e498fcc 100644
--- a/src/util/test_mq_client.c
+++ b/src/util/test_mq_client.c
@@ -60,6 +60,9 @@ recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient,
60 return; 60 return;
61 } 61 }
62 62
63 /* can happen if notify does not work */
64 GNUNET_assert (received < 2);
65
63 GNUNET_SERVER_receive_done (argclient, GNUNET_YES); 66 GNUNET_SERVER_receive_done (argclient, GNUNET_YES);
64} 67}
65 68
@@ -98,14 +101,16 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = {
98 101
99void send_cb (void *cls) 102void send_cb (void *cls)
100{ 103{
104 /* the notify should only be called once */
105 GNUNET_assert (GNUNET_NO == notify);
101 printf ("notify sent\n"); 106 printf ("notify sent\n");
102 notify = GNUNET_YES; 107 notify = GNUNET_YES;
103} 108}
104 109
105void test_mq (struct GNUNET_CLIENT_Connection *client) 110void test_mq (struct GNUNET_CLIENT_Connection *client)
106{ 111{
107 struct GNUNET_MQ_MessageQueue *mq; 112 struct GNUNET_MQ_Handle *mq;
108 struct GNUNET_MQ_Message *mqm; 113 struct GNUNET_MQ_Envelope *mqm;
109 114
110 /* FIXME: test handling responses */ 115 /* FIXME: test handling responses */
111 mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL); 116 mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL);