diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-06-19 10:48:54 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-06-19 10:48:54 +0000 |
commit | a900b29ddaa9ea46c731b054b5e3ef3e725b95a8 (patch) | |
tree | 52e1a9697b0abf4618cd5684359ec5f0a040898a /src/util | |
parent | 17353bc0a47c89bda205f23e7995377c9bfe7769 (diff) | |
download | gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.tar.gz gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.zip |
- opaque mq structs
- mq for mesh
- faster hashing for IBFs
- mesh replaces stream in set
- new set profiler (work in progress)
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/mq.c | 464 | ||||
-rw-r--r-- | src/util/test_mq.c | 4 | ||||
-rw-r--r-- | src/util/test_mq_client.c | 9 |
3 files changed, 352 insertions, 125 deletions
diff --git a/src/util/mq.c b/src/util/mq.c index dc87b9711..d0253c40f 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -31,6 +31,118 @@ | |||
31 | #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) | 31 | #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) |
32 | 32 | ||
33 | 33 | ||
34 | struct GNUNET_MQ_Envelope | ||
35 | { | ||
36 | /** | ||
37 | * Messages are stored in a linked list. | ||
38 | * Each queue has its own list of envelopes. | ||
39 | */ | ||
40 | struct GNUNET_MQ_Envelope *next; | ||
41 | |||
42 | /** | ||
43 | * Messages are stored in a linked list | ||
44 | * Each queue has its own list of envelopes. | ||
45 | */ | ||
46 | struct GNUNET_MQ_Envelope *prev; | ||
47 | |||
48 | /** | ||
49 | * Actual allocated message header, | ||
50 | * usually points to the end of the containing GNUNET_MQ_Envelope | ||
51 | */ | ||
52 | struct GNUNET_MessageHeader *mh; | ||
53 | |||
54 | /** | ||
55 | * Queue the message is queued in, NULL if message is not queued. | ||
56 | */ | ||
57 | struct GNUNET_MQ_Handle *parent_queue; | ||
58 | |||
59 | /** | ||
60 | * Called after the message was sent irrevocably. | ||
61 | */ | ||
62 | GNUNET_MQ_NotifyCallback sent_cb; | ||
63 | |||
64 | /** | ||
65 | * Closure for send_cb | ||
66 | */ | ||
67 | void *sent_cls; | ||
68 | }; | ||
69 | |||
70 | |||
71 | /** | ||
72 | * Handle to a message queue. | ||
73 | */ | ||
74 | struct GNUNET_MQ_Handle | ||
75 | { | ||
76 | /** | ||
77 | * Handlers array, or NULL if the queue should not receive messages | ||
78 | */ | ||
79 | const struct GNUNET_MQ_MessageHandler *handlers; | ||
80 | |||
81 | /** | ||
82 | * Closure for the handler callbacks, | ||
83 | * as well as for the error handler. | ||
84 | */ | ||
85 | void *handlers_cls; | ||
86 | |||
87 | /** | ||
88 | * Actual implementation of message sending, | ||
89 | * called when a message is added | ||
90 | */ | ||
91 | GNUNET_MQ_SendImpl send_impl; | ||
92 | |||
93 | /** | ||
94 | * Implementation-dependent queue destruction function | ||
95 | */ | ||
96 | GNUNET_MQ_DestroyImpl destroy_impl; | ||
97 | |||
98 | /** | ||
99 | * Implementation-specific state | ||
100 | */ | ||
101 | void *impl_state; | ||
102 | |||
103 | /** | ||
104 | * Callback will be called when an error occurs. | ||
105 | */ | ||
106 | GNUNET_MQ_ErrorHandler error_handler; | ||
107 | |||
108 | /** | ||
109 | * Linked list of messages pending to be sent | ||
110 | */ | ||
111 | struct GNUNET_MQ_Envelope *envelope_head; | ||
112 | |||
113 | /** | ||
114 | * Linked list of messages pending to be sent | ||
115 | */ | ||
116 | struct GNUNET_MQ_Envelope *envelope_tail; | ||
117 | |||
118 | /** | ||
119 | * Message that is currently scheduled to be | ||
120 | * sent. Not the head of the message queue, as the implementation | ||
121 | * needs to know if sending has been already scheduled or not. | ||
122 | */ | ||
123 | struct GNUNET_MQ_Envelope *current_envelope; | ||
124 | |||
125 | /** | ||
126 | * Has the current envelope been commited? | ||
127 | * Either GNUNET_YES or GNUNET_NO. | ||
128 | */ | ||
129 | int commited; | ||
130 | |||
131 | /** | ||
132 | * Map of associations, lazily allocated | ||
133 | */ | ||
134 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | ||
135 | |||
136 | /** | ||
137 | * Next id that should be used for the assoc_map, | ||
138 | * initialized lazily to a random value together with | ||
139 | * assoc_map | ||
140 | */ | ||
141 | uint32_t assoc_id; | ||
142 | }; | ||
143 | |||
144 | |||
145 | |||
34 | 146 | ||
35 | struct ServerClientSocketState | 147 | struct ServerClientSocketState |
36 | { | 148 | { |
@@ -42,9 +154,14 @@ struct ServerClientSocketState | |||
42 | struct ClientConnectionState | 154 | struct ClientConnectionState |
43 | { | 155 | { |
44 | /** | 156 | /** |
45 | * Did we call receive? | 157 | * Did we call receive alread alreadyy? |
46 | */ | 158 | */ |
47 | int receive_active; | 159 | int receive_active; |
160 | |||
161 | /** | ||
162 | * Do we also want to receive? | ||
163 | */ | ||
164 | int receive_requested; | ||
48 | struct GNUNET_CLIENT_Connection *connection; | 165 | struct GNUNET_CLIENT_Connection *connection; |
49 | struct GNUNET_CLIENT_TransmitHandle *th; | 166 | struct GNUNET_CLIENT_TransmitHandle *th; |
50 | }; | 167 | }; |
@@ -59,9 +176,9 @@ struct ClientConnectionState | |||
59 | * @param mh message to dispatch | 176 | * @param mh message to dispatch |
60 | */ | 177 | */ |
61 | void | 178 | void |
62 | GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) | 179 | GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh) |
63 | { | 180 | { |
64 | const struct GNUNET_MQ_Handler *handler; | 181 | const struct GNUNET_MQ_MessageHandler *handler; |
65 | int handled = GNUNET_NO; | 182 | int handled = GNUNET_NO; |
66 | 183 | ||
67 | handler = mq->handlers; | 184 | handler = mq->handlers; |
@@ -81,8 +198,27 @@ GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_Messa | |||
81 | } | 198 | } |
82 | 199 | ||
83 | 200 | ||
201 | /** | ||
202 | * Call the right callback for an error condition. | ||
203 | * | ||
204 | * @param mq message queue | ||
205 | */ | ||
84 | void | 206 | void |
85 | GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) | 207 | GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, |
208 | enum GNUNET_MQ_Error error) | ||
209 | { | ||
210 | if (NULL == mq->error_handler) | ||
211 | { | ||
212 | /* FIXME: log what kind of error occured */ | ||
213 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n"); | ||
214 | return; | ||
215 | } | ||
216 | mq->error_handler (mq->handlers_cls, error); | ||
217 | } | ||
218 | |||
219 | |||
220 | void | ||
221 | GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) | ||
86 | { | 222 | { |
87 | GNUNET_assert (NULL == mqm->parent_queue); | 223 | GNUNET_assert (NULL == mqm->parent_queue); |
88 | GNUNET_free (mqm); | 224 | GNUNET_free (mqm); |
@@ -94,20 +230,156 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) | |||
94 | * May only be called once per message. | 230 | * May only be called once per message. |
95 | * | 231 | * |
96 | * @param mq message queue | 232 | * @param mq message queue |
97 | * @param mqm the message to send. | 233 | * @param ev the message to send. |
234 | */ | ||
235 | void | ||
236 | GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) | ||
237 | { | ||
238 | GNUNET_assert (NULL != mq); | ||
239 | GNUNET_assert (NULL == ev->parent_queue); | ||
240 | |||
241 | /* is the implementation busy? queue it! */ | ||
242 | if (NULL != mq->current_envelope) | ||
243 | { | ||
244 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); | ||
245 | return; | ||
246 | } | ||
247 | mq->current_envelope = ev; | ||
248 | mq->send_impl (mq, ev->mh, mq->impl_state); | ||
249 | } | ||
250 | |||
251 | |||
252 | /** | ||
253 | * Call the send implementation for the next queued message, | ||
254 | * if any. | ||
255 | * Only useful for implementing message queues, | ||
256 | * results in undefined behavior if not used carefully. | ||
257 | * | ||
258 | * @param mq message queue to send the next message with | ||
98 | */ | 259 | */ |
99 | void | 260 | void |
100 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 261 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) |
101 | { | 262 | { |
263 | /* call is only valid if we're actually currently sending | ||
264 | * a message */ | ||
102 | GNUNET_assert (NULL != mq); | 265 | GNUNET_assert (NULL != mq); |
103 | mq->send_impl (mq, mqm); | 266 | GNUNET_assert (NULL != mq->current_envelope); |
267 | GNUNET_assert (GNUNET_YES == mq->commited); | ||
268 | mq->commited = GNUNET_NO; | ||
269 | GNUNET_free (mq->current_envelope); | ||
270 | if (NULL == mq->envelope_head) | ||
271 | { | ||
272 | mq->current_envelope = NULL; | ||
273 | return; | ||
274 | } | ||
275 | |||
276 | |||
277 | GNUNET_assert (NULL != mq->envelope_tail); | ||
278 | GNUNET_assert (NULL != mq->envelope_head); | ||
279 | mq->current_envelope = mq->envelope_head; | ||
280 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, | ||
281 | mq->current_envelope); | ||
282 | mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); | ||
104 | } | 283 | } |
105 | 284 | ||
106 | 285 | ||
107 | struct GNUNET_MQ_Message * | 286 | /** |
287 | * Create a message queue for the specified handlers. | ||
288 | * | ||
289 | * @param send function the implements sending messages | ||
290 | * @param destroy function that implements destroying the queue | ||
291 | * @param destroy function that implements canceling a message | ||
292 | * @param state for the queue, passed to 'send' and 'destroy' | ||
293 | * @param handlers array of message handlers | ||
294 | * @param error_handler handler for read and write errors | ||
295 | * @return a new message queue | ||
296 | */ | ||
297 | struct GNUNET_MQ_Handle * | ||
298 | GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, | ||
299 | GNUNET_MQ_DestroyImpl destroy, | ||
300 | GNUNET_MQ_CancelImpl cancel, | ||
301 | void *impl_state, | ||
302 | const struct GNUNET_MQ_MessageHandler *handlers, | ||
303 | GNUNET_MQ_ErrorHandler error_handler, | ||
304 | void *cls) | ||
305 | { | ||
306 | struct GNUNET_MQ_Handle *mq; | ||
307 | |||
308 | mq = GNUNET_new (struct GNUNET_MQ_Handle); | ||
309 | mq->send_impl = send; | ||
310 | mq->destroy_impl = destroy; | ||
311 | mq->handlers = handlers; | ||
312 | mq->handlers_cls = cls; | ||
313 | mq->impl_state = impl_state; | ||
314 | |||
315 | return mq; | ||
316 | } | ||
317 | |||
318 | |||
319 | /** | ||
320 | * Get the message that should currently be sent. | ||
321 | * Fails if there is no current message. | ||
322 | * Only useful for implementing message queues, | ||
323 | * results in undefined behavior if not used carefully. | ||
324 | * | ||
325 | * @param mq message queue with the current message | ||
326 | * @return message to send, never NULL | ||
327 | */ | ||
328 | const struct GNUNET_MessageHeader * | ||
329 | GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) | ||
330 | { | ||
331 | if (NULL == mq->current_envelope) | ||
332 | GNUNET_abort (); | ||
333 | if (NULL == mq->current_envelope->mh) | ||
334 | GNUNET_abort (); | ||
335 | return mq->current_envelope->mh; | ||
336 | } | ||
337 | |||
338 | |||
339 | /** | ||
340 | * Get the implementation state associated with the | ||
341 | * message queue. | ||
342 | * | ||
343 | * While the GNUNET_MQ_Impl* callbacks receive the | ||
344 | * implementation state, continuations that are scheduled | ||
345 | * by the implementation function often only have one closure | ||
346 | * argument, with this function it is possible to get at the | ||
347 | * implementation state when only passing the GNUNET_MQ_Handle | ||
348 | * as closure. | ||
349 | * | ||
350 | * @param mq message queue with the current message | ||
351 | * @return message to send, never NULL | ||
352 | */ | ||
353 | void * | ||
354 | GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) | ||
355 | { | ||
356 | return mq->impl_state; | ||
357 | } | ||
358 | |||
359 | |||
360 | |||
361 | /** | ||
362 | * Mark the current message as irrevocably sent, but do not | ||
363 | * proceed with sending the next message. | ||
364 | * Will call the appropriate GNUNET_MQ_NotifyCallback, if any. | ||
365 | * | ||
366 | * @param mq message queue | ||
367 | */ | ||
368 | void | ||
369 | GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq) | ||
370 | { | ||
371 | GNUNET_assert (NULL != mq->current_envelope); | ||
372 | GNUNET_assert (GNUNET_NO == mq->commited); | ||
373 | mq->commited = GNUNET_YES; | ||
374 | if (NULL != mq->current_envelope->sent_cb) | ||
375 | mq->current_envelope->sent_cb (mq->current_envelope->sent_cls); | ||
376 | } | ||
377 | |||
378 | |||
379 | struct GNUNET_MQ_Envelope * | ||
108 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | 380 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) |
109 | { | 381 | { |
110 | struct GNUNET_MQ_Message *mqm; | 382 | struct GNUNET_MQ_Envelope *mqm; |
111 | 383 | ||
112 | mqm = GNUNET_malloc (sizeof *mqm + size); | 384 | mqm = GNUNET_malloc (sizeof *mqm + size); |
113 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; | 385 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; |
@@ -119,11 +391,11 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
119 | } | 391 | } |
120 | 392 | ||
121 | 393 | ||
122 | struct GNUNET_MQ_Message * | 394 | struct GNUNET_MQ_Envelope * |
123 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, | 395 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, |
124 | const struct GNUNET_MessageHeader *nested_mh) | 396 | const struct GNUNET_MessageHeader *nested_mh) |
125 | { | 397 | { |
126 | struct GNUNET_MQ_Message *mqm; | 398 | struct GNUNET_MQ_Envelope *mqm; |
127 | uint16_t size; | 399 | uint16_t size; |
128 | 400 | ||
129 | if (NULL == nested_mh) | 401 | if (NULL == nested_mh) |
@@ -154,85 +426,62 @@ static size_t | |||
154 | transmit_queued (void *cls, size_t size, | 426 | transmit_queued (void *cls, size_t size, |
155 | void *buf) | 427 | void *buf) |
156 | { | 428 | { |
157 | struct GNUNET_MQ_MessageQueue *mq = cls; | 429 | struct GNUNET_MQ_Handle *mq = cls; |
158 | struct GNUNET_MQ_Message *mqm = mq->current_msg; | 430 | struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq); |
159 | struct ServerClientSocketState *state = mq->impl_state; | 431 | const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); |
160 | size_t msg_size; | 432 | size_t msg_size; |
161 | 433 | ||
162 | GNUNET_assert (NULL != buf); | 434 | GNUNET_assert (NULL != buf); |
163 | 435 | ||
164 | if (NULL != mqm->sent_cb) | 436 | msg_size = ntohs (msg->size); |
165 | { | ||
166 | mqm->sent_cb (mqm->sent_cls); | ||
167 | } | ||
168 | |||
169 | mq->current_msg = NULL; | ||
170 | GNUNET_assert (NULL != mqm); | ||
171 | msg_size = ntohs (mqm->mh->size); | ||
172 | GNUNET_assert (size >= msg_size); | 437 | GNUNET_assert (size >= msg_size); |
173 | memcpy (buf, mqm->mh, msg_size); | 438 | memcpy (buf, msg, msg_size); |
174 | GNUNET_free (mqm); | ||
175 | state->th = NULL; | 439 | state->th = NULL; |
176 | 440 | ||
177 | if (NULL != mq->msg_head) | 441 | GNUNET_MQ_impl_send_continue (mq); |
178 | { | 442 | |
179 | mq->current_msg = mq->msg_head; | ||
180 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | ||
181 | state->th = | ||
182 | GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, | ||
183 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
184 | &transmit_queued, mq); | ||
185 | } | ||
186 | return msg_size; | 443 | return msg_size; |
187 | } | 444 | } |
188 | 445 | ||
189 | 446 | ||
190 | 447 | ||
191 | static void | 448 | static void |
192 | server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | 449 | server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, |
450 | void *impl_state) | ||
193 | { | 451 | { |
194 | struct ServerClientSocketState *state; | 452 | struct ServerClientSocketState *state = impl_state; |
195 | 453 | ||
196 | GNUNET_assert (NULL != mq); | 454 | GNUNET_assert (NULL != mq); |
197 | state = mq->impl_state; | ||
198 | GNUNET_assert (NULL != state); | 455 | GNUNET_assert (NULL != state); |
199 | GNUNET_SERVER_client_drop (state->client); | 456 | GNUNET_SERVER_client_drop (state->client); |
200 | GNUNET_free (state); | 457 | GNUNET_free (state); |
201 | } | 458 | } |
202 | 459 | ||
203 | static void | 460 | static void |
204 | server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 461 | server_client_send_impl (struct GNUNET_MQ_Handle *mq, |
462 | const struct GNUNET_MessageHeader *msg, void *impl_state) | ||
205 | { | 463 | { |
206 | struct ServerClientSocketState *state; | 464 | struct ServerClientSocketState *state = impl_state; |
207 | int msize; | ||
208 | 465 | ||
209 | GNUNET_assert (NULL != mq); | 466 | GNUNET_assert (NULL != mq); |
210 | state = mq->impl_state; | ||
211 | GNUNET_assert (NULL != state); | 467 | GNUNET_assert (NULL != state); |
212 | 468 | ||
213 | if (NULL != state->th) | 469 | GNUNET_MQ_impl_send_commit (mq); |
214 | { | 470 | |
215 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
216 | return; | ||
217 | } | ||
218 | GNUNET_assert (NULL == mq->msg_head); | ||
219 | GNUNET_assert (NULL == mq->current_msg); | ||
220 | msize = ntohs (mqm->mh->size); | ||
221 | mq->current_msg = mqm; | ||
222 | state->th = | 471 | state->th = |
223 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, | 472 | GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size), |
224 | GNUNET_TIME_UNIT_FOREVER_REL, | 473 | GNUNET_TIME_UNIT_FOREVER_REL, |
225 | &transmit_queued, mq); | 474 | &transmit_queued, mq); |
226 | } | 475 | } |
227 | 476 | ||
228 | 477 | ||
229 | struct GNUNET_MQ_MessageQueue * | 478 | struct GNUNET_MQ_Handle * |
230 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) | 479 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) |
231 | { | 480 | { |
232 | struct GNUNET_MQ_MessageQueue *mq; | 481 | struct GNUNET_MQ_Handle *mq; |
233 | struct ServerClientSocketState *scss; | 482 | struct ServerClientSocketState *scss; |
234 | 483 | ||
235 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | 484 | mq = GNUNET_new (struct GNUNET_MQ_Handle); |
236 | scss = GNUNET_new (struct ServerClientSocketState); | 485 | scss = GNUNET_new (struct ServerClientSocketState); |
237 | mq->impl_state = scss; | 486 | mq->impl_state = scss; |
238 | scss->client = client; | 487 | scss->client = client; |
@@ -254,24 +503,21 @@ static void | |||
254 | handle_client_message (void *cls, | 503 | handle_client_message (void *cls, |
255 | const struct GNUNET_MessageHeader *msg) | 504 | const struct GNUNET_MessageHeader *msg) |
256 | { | 505 | { |
257 | struct GNUNET_MQ_MessageQueue *mq = cls; | 506 | struct GNUNET_MQ_Handle *mq = cls; |
258 | struct ClientConnectionState *state; | 507 | struct ClientConnectionState *state; |
259 | 508 | ||
260 | state = mq->impl_state; | 509 | state = mq->impl_state; |
261 | 510 | ||
262 | if (NULL == msg) | 511 | if (NULL == msg) |
263 | { | 512 | { |
264 | if (NULL == mq->error_handler) | 513 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
265 | LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); | ||
266 | else | ||
267 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
268 | return; | 514 | return; |
269 | } | 515 | } |
270 | 516 | ||
271 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | 517 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, |
272 | GNUNET_TIME_UNIT_FOREVER_REL); | 518 | GNUNET_TIME_UNIT_FOREVER_REL); |
273 | 519 | ||
274 | GNUNET_MQ_dispatch (mq, msg); | 520 | GNUNET_MQ_inject_message (mq, msg); |
275 | } | 521 | } |
276 | 522 | ||
277 | 523 | ||
@@ -287,23 +533,22 @@ static size_t | |||
287 | connection_client_transmit_queued (void *cls, size_t size, | 533 | connection_client_transmit_queued (void *cls, size_t size, |
288 | void *buf) | 534 | void *buf) |
289 | { | 535 | { |
290 | struct GNUNET_MQ_MessageQueue *mq = cls; | 536 | struct GNUNET_MQ_Handle *mq = cls; |
291 | struct GNUNET_MQ_Message *mqm = mq->current_msg; | 537 | const struct GNUNET_MessageHeader *msg; |
292 | struct ClientConnectionState *state = mq->impl_state; | 538 | struct ClientConnectionState *state = mq->impl_state; |
293 | size_t msg_size; | 539 | size_t msg_size; |
294 | 540 | ||
541 | GNUNET_assert (NULL != mq); | ||
542 | msg = GNUNET_MQ_impl_current (mq); | ||
543 | |||
295 | if (NULL == buf) | 544 | if (NULL == buf) |
296 | { | 545 | { |
297 | if (NULL == mq->error_handler) | 546 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
298 | { | ||
299 | LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n"); | ||
300 | return 0; | ||
301 | } | ||
302 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
303 | return 0; | 547 | return 0; |
304 | } | 548 | } |
305 | 549 | ||
306 | if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active)) | 550 | if ( (GNUNET_YES == state->receive_requested) && |
551 | (GNUNET_NO == state->receive_active) ) | ||
307 | { | 552 | { |
308 | state->receive_active = GNUNET_YES; | 553 | state->receive_active = GNUNET_YES; |
309 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | 554 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, |
@@ -311,78 +556,53 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
311 | } | 556 | } |
312 | 557 | ||
313 | 558 | ||
314 | GNUNET_assert (NULL != mqm); | 559 | msg_size = ntohs (msg->size); |
315 | |||
316 | if (NULL != mqm->sent_cb) | ||
317 | { | ||
318 | mqm->sent_cb (mqm->sent_cls); | ||
319 | } | ||
320 | |||
321 | mq->current_msg = NULL; | ||
322 | GNUNET_assert (NULL != buf); | ||
323 | msg_size = ntohs (mqm->mh->size); | ||
324 | GNUNET_assert (size >= msg_size); | 560 | GNUNET_assert (size >= msg_size); |
325 | memcpy (buf, mqm->mh, msg_size); | 561 | memcpy (buf, msg, msg_size); |
326 | GNUNET_free (mqm); | ||
327 | state->th = NULL; | 562 | state->th = NULL; |
328 | if (NULL != mq->msg_head) | 563 | |
329 | { | 564 | GNUNET_MQ_impl_send_continue (mq); |
330 | mq->current_msg = mq->msg_head; | 565 | |
331 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | ||
332 | state->th = | ||
333 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), | ||
334 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | ||
335 | &connection_client_transmit_queued, mq); | ||
336 | } | ||
337 | return msg_size; | 566 | return msg_size; |
338 | } | 567 | } |
339 | 568 | ||
340 | 569 | ||
341 | 570 | ||
342 | static void | 571 | static void |
343 | connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | 572 | connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) |
344 | { | 573 | { |
345 | GNUNET_free (mq->impl_state); | 574 | GNUNET_free (impl_state); |
346 | } | 575 | } |
347 | 576 | ||
348 | static void | 577 | static void |
349 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, | 578 | connection_client_send_impl (struct GNUNET_MQ_Handle *mq, |
350 | struct GNUNET_MQ_Message *mqm) | 579 | const struct GNUNET_MessageHeader *msg, void *impl_state) |
351 | { | 580 | { |
352 | struct ClientConnectionState *state = mq->impl_state; | 581 | struct ClientConnectionState *state = impl_state; |
353 | int msize; | ||
354 | 582 | ||
355 | GNUNET_assert (NULL != state); | 583 | GNUNET_assert (NULL != state); |
584 | GNUNET_assert (NULL == state->th); | ||
585 | |||
586 | GNUNET_MQ_impl_send_commit (mq); | ||
356 | 587 | ||
357 | if (NULL != state->th) | ||
358 | { | ||
359 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
360 | return; | ||
361 | } | ||
362 | GNUNET_assert (NULL == mq->current_msg); | ||
363 | mq->current_msg = mqm; | ||
364 | msize = ntohs (mqm->mh->size); | ||
365 | state->th = | 588 | state->th = |
366 | GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, | 589 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), |
367 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | 590 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, |
368 | &connection_client_transmit_queued, mq); | 591 | &connection_client_transmit_queued, mq); |
369 | } | 592 | } |
370 | 593 | ||
371 | 594 | ||
372 | 595 | struct GNUNET_MQ_Handle * | |
373 | |||
374 | |||
375 | struct GNUNET_MQ_MessageQueue * | ||
376 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, | 596 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, |
377 | const struct GNUNET_MQ_Handler *handlers, | 597 | const struct GNUNET_MQ_MessageHandler *handlers, |
378 | void *cls) | 598 | void *cls) |
379 | { | 599 | { |
380 | struct GNUNET_MQ_MessageQueue *mq; | 600 | struct GNUNET_MQ_Handle *mq; |
381 | struct ClientConnectionState *state; | 601 | struct ClientConnectionState *state; |
382 | 602 | ||
383 | GNUNET_assert (NULL != connection); | 603 | GNUNET_assert (NULL != connection); |
384 | 604 | ||
385 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | 605 | mq = GNUNET_new (struct GNUNET_MQ_Handle); |
386 | mq->handlers = handlers; | 606 | mq->handlers = handlers; |
387 | mq->handlers_cls = cls; | 607 | mq->handlers_cls = cls; |
388 | state = GNUNET_new (struct ClientConnectionState); | 608 | state = GNUNET_new (struct ClientConnectionState); |
@@ -390,16 +610,20 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti | |||
390 | mq->impl_state = state; | 610 | mq->impl_state = state; |
391 | mq->send_impl = connection_client_send_impl; | 611 | mq->send_impl = connection_client_send_impl; |
392 | mq->destroy_impl = connection_client_destroy_impl; | 612 | mq->destroy_impl = connection_client_destroy_impl; |
613 | if (NULL != handlers) | ||
614 | state->receive_requested = GNUNET_YES; | ||
393 | 615 | ||
394 | return mq; | 616 | return mq; |
395 | } | 617 | } |
396 | 618 | ||
397 | 619 | ||
398 | void | 620 | void |
399 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, | 621 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq, |
400 | const struct GNUNET_MQ_Handler *new_handlers, | 622 | const struct GNUNET_MQ_MessageHandler *new_handlers, |
401 | void *cls) | 623 | void *cls) |
402 | { | 624 | { |
625 | /* FIXME: notify implementation? */ | ||
626 | /* FIXME: what about NULL handlers? abort receive? */ | ||
403 | mq->handlers = new_handlers; | 627 | mq->handlers = new_handlers; |
404 | mq->handlers_cls = cls; | 628 | mq->handlers_cls = cls; |
405 | } | 629 | } |
@@ -413,8 +637,7 @@ GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, | |||
413 | * @param assoc_data to associate | 637 | * @param assoc_data to associate |
414 | */ | 638 | */ |
415 | uint32_t | 639 | uint32_t |
416 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | 640 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, |
417 | struct GNUNET_MQ_Message *mqm, | ||
418 | void *assoc_data) | 641 | void *assoc_data) |
419 | { | 642 | { |
420 | uint32_t id; | 643 | uint32_t id; |
@@ -433,7 +656,7 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | |||
433 | 656 | ||
434 | 657 | ||
435 | void * | 658 | void * |
436 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | 659 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) |
437 | { | 660 | { |
438 | if (NULL == mq->assoc_map) | 661 | if (NULL == mq->assoc_map) |
439 | return NULL; | 662 | return NULL; |
@@ -442,7 +665,7 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | |||
442 | 665 | ||
443 | 666 | ||
444 | void * | 667 | void * |
445 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | 668 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) |
446 | { | 669 | { |
447 | void *val; | 670 | void *val; |
448 | 671 | ||
@@ -456,7 +679,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | |||
456 | 679 | ||
457 | 680 | ||
458 | void | 681 | void |
459 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | 682 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm, |
460 | GNUNET_MQ_NotifyCallback cb, | 683 | GNUNET_MQ_NotifyCallback cb, |
461 | void *cls) | 684 | void *cls) |
462 | { | 685 | { |
@@ -466,13 +689,13 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | |||
466 | 689 | ||
467 | 690 | ||
468 | void | 691 | void |
469 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | 692 | GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) |
470 | { | 693 | { |
471 | /* FIXME: destroy all pending messages in the queue */ | 694 | /* FIXME: destroy all pending messages in the queue */ |
472 | 695 | ||
473 | if (NULL != mq->destroy_impl) | 696 | if (NULL != mq->destroy_impl) |
474 | { | 697 | { |
475 | mq->destroy_impl (mq); | 698 | mq->destroy_impl (mq, mq->impl_state); |
476 | } | 699 | } |
477 | 700 | ||
478 | GNUNET_free (mq); | 701 | GNUNET_free (mq); |
@@ -480,7 +703,6 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | |||
480 | 703 | ||
481 | 704 | ||
482 | 705 | ||
483 | |||
484 | struct GNUNET_MessageHeader * | 706 | struct GNUNET_MessageHeader * |
485 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) | 707 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) |
486 | { | 708 | { |
diff --git a/src/util/test_mq.c b/src/util/test_mq.c index 55cd80ef1..45bba0a6b 100644 --- a/src/util/test_mq.c +++ b/src/util/test_mq.c | |||
@@ -40,7 +40,7 @@ GNUNET_NETWORK_STRUCT_END | |||
40 | void | 40 | void |
41 | test1 (void) | 41 | test1 (void) |
42 | { | 42 | { |
43 | struct GNUNET_MQ_Message *mqm; | 43 | struct GNUNET_MQ_Envelope *mqm; |
44 | struct MyMessage *mm; | 44 | struct MyMessage *mm; |
45 | 45 | ||
46 | mm = NULL; | 46 | mm = NULL; |
@@ -57,7 +57,7 @@ test1 (void) | |||
57 | void | 57 | void |
58 | test2 (void) | 58 | test2 (void) |
59 | { | 59 | { |
60 | struct GNUNET_MQ_Message *mqm; | 60 | struct GNUNET_MQ_Envelope *mqm; |
61 | struct GNUNET_MessageHeader *mh; | 61 | struct GNUNET_MessageHeader *mh; |
62 | 62 | ||
63 | mqm = GNUNET_MQ_msg_header (42); | 63 | mqm = GNUNET_MQ_msg_header (42); |
diff --git a/src/util/test_mq_client.c b/src/util/test_mq_client.c index b7eb1516a..30e498fcc 100644 --- a/src/util/test_mq_client.c +++ b/src/util/test_mq_client.c | |||
@@ -60,6 +60,9 @@ recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient, | |||
60 | return; | 60 | return; |
61 | } | 61 | } |
62 | 62 | ||
63 | /* can happen if notify does not work */ | ||
64 | GNUNET_assert (received < 2); | ||
65 | |||
63 | GNUNET_SERVER_receive_done (argclient, GNUNET_YES); | 66 | GNUNET_SERVER_receive_done (argclient, GNUNET_YES); |
64 | } | 67 | } |
65 | 68 | ||
@@ -98,14 +101,16 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { | |||
98 | 101 | ||
99 | void send_cb (void *cls) | 102 | void send_cb (void *cls) |
100 | { | 103 | { |
104 | /* the notify should only be called once */ | ||
105 | GNUNET_assert (GNUNET_NO == notify); | ||
101 | printf ("notify sent\n"); | 106 | printf ("notify sent\n"); |
102 | notify = GNUNET_YES; | 107 | notify = GNUNET_YES; |
103 | } | 108 | } |
104 | 109 | ||
105 | void test_mq (struct GNUNET_CLIENT_Connection *client) | 110 | void test_mq (struct GNUNET_CLIENT_Connection *client) |
106 | { | 111 | { |
107 | struct GNUNET_MQ_MessageQueue *mq; | 112 | struct GNUNET_MQ_Handle *mq; |
108 | struct GNUNET_MQ_Message *mqm; | 113 | struct GNUNET_MQ_Envelope *mqm; |
109 | 114 | ||
110 | /* FIXME: test handling responses */ | 115 | /* FIXME: test handling responses */ |
111 | mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL); | 116 | mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL); |