diff options
Diffstat (limited to 'src/util/mq.c')
-rw-r--r-- | src/util/mq.c | 669 |
1 files changed, 336 insertions, 333 deletions
diff --git a/src/util/mq.c b/src/util/mq.c index 188606fb4..1c1f2893f 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -26,10 +26,11 @@ | |||
26 | #include "platform.h" | 26 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 27 | #include "gnunet_util_lib.h" |
28 | 28 | ||
29 | #define LOG(kind, ...) GNUNET_log_from(kind, "util-mq", __VA_ARGS__) | 29 | #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__) |
30 | 30 | ||
31 | 31 | ||
32 | struct GNUNET_MQ_Envelope { | 32 | struct GNUNET_MQ_Envelope |
33 | { | ||
33 | /** | 34 | /** |
34 | * Messages are stored in a linked list. | 35 | * Messages are stored in a linked list. |
35 | * Each queue has its own list of envelopes. | 36 | * Each queue has its own list of envelopes. |
@@ -81,7 +82,8 @@ struct GNUNET_MQ_Envelope { | |||
81 | /** | 82 | /** |
82 | * Handle to a message queue. | 83 | * Handle to a message queue. |
83 | */ | 84 | */ |
84 | struct GNUNET_MQ_Handle { | 85 | struct GNUNET_MQ_Handle |
86 | { | ||
85 | /** | 87 | /** |
86 | * Handlers array, or NULL if the queue should not receive messages | 88 | * Handlers array, or NULL if the queue should not receive messages |
87 | */ | 89 | */ |
@@ -197,17 +199,17 @@ struct GNUNET_MQ_Handle { | |||
197 | * @param mh message to dispatch | 199 | * @param mh message to dispatch |
198 | */ | 200 | */ |
199 | void | 201 | void |
200 | GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq, | 202 | GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, |
201 | const struct GNUNET_MessageHeader *mh) | 203 | const struct GNUNET_MessageHeader *mh) |
202 | { | 204 | { |
203 | int ret; | 205 | int ret; |
204 | 206 | ||
205 | ret = GNUNET_MQ_handle_message(mq->handlers, mh); | 207 | ret = GNUNET_MQ_handle_message (mq->handlers, mh); |
206 | if (GNUNET_SYSERR == ret) | 208 | if (GNUNET_SYSERR == ret) |
207 | { | 209 | { |
208 | GNUNET_MQ_inject_error(mq, GNUNET_MQ_ERROR_MALFORMED); | 210 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED); |
209 | return; | 211 | return; |
210 | } | 212 | } |
211 | } | 213 | } |
212 | 214 | ||
213 | 215 | ||
@@ -224,62 +226,62 @@ GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq, | |||
224 | * #GNUNET_SYSERR if message was rejected by check function | 226 | * #GNUNET_SYSERR if message was rejected by check function |
225 | */ | 227 | */ |
226 | int | 228 | int |
227 | GNUNET_MQ_handle_message(const struct GNUNET_MQ_MessageHandler *handlers, | 229 | GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers, |
228 | const struct GNUNET_MessageHeader *mh) | 230 | const struct GNUNET_MessageHeader *mh) |
229 | { | 231 | { |
230 | const struct GNUNET_MQ_MessageHandler *handler; | 232 | const struct GNUNET_MQ_MessageHandler *handler; |
231 | int handled = GNUNET_NO; | 233 | int handled = GNUNET_NO; |
232 | uint16_t msize = ntohs(mh->size); | 234 | uint16_t msize = ntohs (mh->size); |
233 | uint16_t mtype = ntohs(mh->type); | 235 | uint16_t mtype = ntohs (mh->type); |
234 | 236 | ||
235 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 237 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
236 | "Received message of type %u and size %u\n", | 238 | "Received message of type %u and size %u\n", |
237 | mtype, | 239 | mtype, |
238 | msize); | 240 | msize); |
239 | 241 | ||
240 | if (NULL == handlers) | 242 | if (NULL == handlers) |
241 | goto done; | 243 | goto done; |
242 | for (handler = handlers; NULL != handler->cb; handler++) | 244 | for (handler = handlers; NULL != handler->cb; handler++) |
245 | { | ||
246 | if (handler->type == mtype) | ||
243 | { | 247 | { |
244 | if (handler->type == mtype) | 248 | handled = GNUNET_YES; |
245 | { | 249 | if ((handler->expected_size > msize) || |
246 | handled = GNUNET_YES; | 250 | ((handler->expected_size != msize) && (NULL == handler->mv))) |
247 | if ((handler->expected_size > msize) || | 251 | { |
248 | ((handler->expected_size != msize) && (NULL == handler->mv))) | 252 | /* Too small, or not an exact size and |
249 | { | 253 | no 'mv' handler to check rest */ |
250 | /* Too small, or not an exact size and | 254 | LOG (GNUNET_ERROR_TYPE_ERROR, |
251 | no 'mv' handler to check rest */ | 255 | "Received malformed message of type %u\n", |
252 | LOG(GNUNET_ERROR_TYPE_ERROR, | 256 | (unsigned int) handler->type); |
253 | "Received malformed message of type %u\n", | 257 | return GNUNET_SYSERR; |
254 | (unsigned int)handler->type); | 258 | } |
255 | return GNUNET_SYSERR; | 259 | if ((NULL == handler->mv) || |
256 | } | 260 | (GNUNET_OK == handler->mv (handler->cls, mh))) |
257 | if ((NULL == handler->mv) || | 261 | { |
258 | (GNUNET_OK == handler->mv(handler->cls, mh))) | 262 | /* message well-formed, pass to handler */ |
259 | { | 263 | handler->cb (handler->cls, mh); |
260 | /* message well-formed, pass to handler */ | 264 | } |
261 | handler->cb(handler->cls, mh); | 265 | else |
262 | } | 266 | { |
263 | else | 267 | /* Message rejected by check routine */ |
264 | { | 268 | LOG (GNUNET_ERROR_TYPE_ERROR, |
265 | /* Message rejected by check routine */ | 269 | "Received malformed message of type %u\n", |
266 | LOG(GNUNET_ERROR_TYPE_ERROR, | 270 | (unsigned int) handler->type); |
267 | "Received malformed message of type %u\n", | 271 | return GNUNET_SYSERR; |
268 | (unsigned int)handler->type); | 272 | } |
269 | return GNUNET_SYSERR; | 273 | break; |
270 | } | ||
271 | break; | ||
272 | } | ||
273 | } | 274 | } |
275 | } | ||
274 | done: | 276 | done: |
275 | if (GNUNET_NO == handled) | 277 | if (GNUNET_NO == handled) |
276 | { | 278 | { |
277 | LOG(GNUNET_ERROR_TYPE_INFO, | 279 | LOG (GNUNET_ERROR_TYPE_INFO, |
278 | "No handler for message of type %u and size %u\n", | 280 | "No handler for message of type %u and size %u\n", |
279 | mtype, | 281 | mtype, |
280 | msize); | 282 | msize); |
281 | return GNUNET_NO; | 283 | return GNUNET_NO; |
282 | } | 284 | } |
283 | return GNUNET_OK; | 285 | return GNUNET_OK; |
284 | } | 286 | } |
285 | 287 | ||
@@ -295,16 +297,16 @@ done: | |||
295 | * @param error the error type | 297 | * @param error the error type |
296 | */ | 298 | */ |
297 | void | 299 | void |
298 | GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error) | 300 | GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error) |
299 | { | 301 | { |
300 | if (NULL == mq->error_handler) | 302 | if (NULL == mq->error_handler) |
301 | { | 303 | { |
302 | LOG(GNUNET_ERROR_TYPE_WARNING, | 304 | LOG (GNUNET_ERROR_TYPE_WARNING, |
303 | "Got error %d, but no handler installed\n", | 305 | "Got error %d, but no handler installed\n", |
304 | (int)error); | 306 | (int) error); |
305 | return; | 307 | return; |
306 | } | 308 | } |
307 | mq->error_handler(mq->error_handler_cls, error); | 309 | mq->error_handler (mq->error_handler_cls, error); |
308 | } | 310 | } |
309 | 311 | ||
310 | 312 | ||
@@ -316,10 +318,10 @@ GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error) | |||
316 | * @param mqm the message to discard | 318 | * @param mqm the message to discard |
317 | */ | 319 | */ |
318 | void | 320 | void |
319 | GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev) | 321 | GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) |
320 | { | 322 | { |
321 | GNUNET_assert(NULL == ev->parent_queue); | 323 | GNUNET_assert (NULL == ev->parent_queue); |
322 | GNUNET_free(ev); | 324 | GNUNET_free (ev); |
323 | } | 325 | } |
324 | 326 | ||
325 | 327 | ||
@@ -330,12 +332,12 @@ GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev) | |||
330 | * @return number of queued, non-transmitted messages | 332 | * @return number of queued, non-transmitted messages |
331 | */ | 333 | */ |
332 | unsigned int | 334 | unsigned int |
333 | GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq) | 335 | GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) |
334 | { | 336 | { |
335 | if (GNUNET_YES != mq->in_flight) | 337 | if (GNUNET_YES != mq->in_flight) |
336 | { | 338 | { |
337 | return mq->queue_length; | 339 | return mq->queue_length; |
338 | } | 340 | } |
339 | return mq->queue_length - 1; | 341 | return mq->queue_length - 1; |
340 | } | 342 | } |
341 | 343 | ||
@@ -348,36 +350,36 @@ GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq) | |||
348 | * @param ev the envelope with the message to send. | 350 | * @param ev the envelope with the message to send. |
349 | */ | 351 | */ |
350 | void | 352 | void |
351 | GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) | 353 | GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) |
352 | { | 354 | { |
353 | GNUNET_assert(NULL != mq); | 355 | GNUNET_assert (NULL != mq); |
354 | GNUNET_assert(NULL == ev->parent_queue); | 356 | GNUNET_assert (NULL == ev->parent_queue); |
355 | 357 | ||
356 | mq->queue_length++; | 358 | mq->queue_length++; |
357 | if (mq->queue_length >= 10000) | 359 | if (mq->queue_length >= 10000) |
358 | { | 360 | { |
359 | /* This would seem like a bug... */ | 361 | /* This would seem like a bug... */ |
360 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | 362 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
361 | "MQ with %u entries extended by message of type %u (FC broken?)\n", | 363 | "MQ with %u entries extended by message of type %u (FC broken?)\n", |
362 | (unsigned int)mq->queue_length, | 364 | (unsigned int) mq->queue_length, |
363 | (unsigned int)ntohs(ev->mh->type)); | 365 | (unsigned int) ntohs (ev->mh->type)); |
364 | } | 366 | } |
365 | ev->parent_queue = mq; | 367 | ev->parent_queue = mq; |
366 | /* is the implementation busy? queue it! */ | 368 | /* is the implementation busy? queue it! */ |
367 | if ((NULL != mq->current_envelope) || (NULL != mq->send_task)) | 369 | if ((NULL != mq->current_envelope) || (NULL != mq->send_task)) |
368 | { | 370 | { |
369 | GNUNET_CONTAINER_DLL_insert_tail(mq->envelope_head, mq->envelope_tail, ev); | 371 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); |
370 | return; | 372 | return; |
371 | } | 373 | } |
372 | GNUNET_assert(NULL == mq->envelope_head); | 374 | GNUNET_assert (NULL == mq->envelope_head); |
373 | mq->current_envelope = ev; | 375 | mq->current_envelope = ev; |
374 | 376 | ||
375 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 377 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
376 | "sending message of type %u, queue empty (MQ: %p)\n", | 378 | "sending message of type %u, queue empty (MQ: %p)\n", |
377 | ntohs(ev->mh->type), | 379 | ntohs (ev->mh->type), |
378 | mq); | 380 | mq); |
379 | 381 | ||
380 | mq->send_impl(mq, ev->mh, mq->impl_state); | 382 | mq->send_impl (mq, ev->mh, mq->impl_state); |
381 | } | 383 | } |
382 | 384 | ||
383 | 385 | ||
@@ -389,12 +391,12 @@ GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) | |||
389 | * @return NULL if queue is empty (or has no envelope that is not under transmission) | 391 | * @return NULL if queue is empty (or has no envelope that is not under transmission) |
390 | */ | 392 | */ |
391 | struct GNUNET_MQ_Envelope * | 393 | struct GNUNET_MQ_Envelope * |
392 | GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq) | 394 | GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq) |
393 | { | 395 | { |
394 | struct GNUNET_MQ_Envelope *env; | 396 | struct GNUNET_MQ_Envelope *env; |
395 | 397 | ||
396 | env = mq->envelope_head; | 398 | env = mq->envelope_head; |
397 | GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, env); | 399 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env); |
398 | mq->queue_length--; | 400 | mq->queue_length--; |
399 | env->parent_queue = NULL; | 401 | env->parent_queue = NULL; |
400 | return env; | 402 | return env; |
@@ -409,13 +411,13 @@ GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq) | |||
409 | * @return copy of @a env | 411 | * @return copy of @a env |
410 | */ | 412 | */ |
411 | struct GNUNET_MQ_Envelope * | 413 | struct GNUNET_MQ_Envelope * |
412 | GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env) | 414 | GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env) |
413 | { | 415 | { |
414 | GNUNET_assert(NULL == env->next); | 416 | GNUNET_assert (NULL == env->next); |
415 | GNUNET_assert(NULL == env->parent_queue); | 417 | GNUNET_assert (NULL == env->parent_queue); |
416 | GNUNET_assert(NULL == env->sent_cb); | 418 | GNUNET_assert (NULL == env->sent_cb); |
417 | GNUNET_assert(GNUNET_NO == env->have_custom_options); | 419 | GNUNET_assert (GNUNET_NO == env->have_custom_options); |
418 | return GNUNET_MQ_msg_copy(env->mh); | 420 | return GNUNET_MQ_msg_copy (env->mh); |
419 | } | 421 | } |
420 | 422 | ||
421 | 423 | ||
@@ -427,19 +429,19 @@ GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env) | |||
427 | * @param ev the envelope with the message to send. | 429 | * @param ev the envelope with the message to send. |
428 | */ | 430 | */ |
429 | void | 431 | void |
430 | GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq, | 432 | GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, |
431 | const struct GNUNET_MQ_Envelope *ev) | 433 | const struct GNUNET_MQ_Envelope *ev) |
432 | { | 434 | { |
433 | struct GNUNET_MQ_Envelope *env; | 435 | struct GNUNET_MQ_Envelope *env; |
434 | uint16_t msize; | 436 | uint16_t msize; |
435 | 437 | ||
436 | msize = ntohs(ev->mh->size); | 438 | msize = ntohs (ev->mh->size); |
437 | env = GNUNET_malloc(sizeof(struct GNUNET_MQ_Envelope) + msize); | 439 | env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize); |
438 | env->mh = (struct GNUNET_MessageHeader *)&env[1]; | 440 | env->mh = (struct GNUNET_MessageHeader *) &env[1]; |
439 | env->sent_cb = ev->sent_cb; | 441 | env->sent_cb = ev->sent_cb; |
440 | env->sent_cls = ev->sent_cls; | 442 | env->sent_cls = ev->sent_cls; |
441 | GNUNET_memcpy(&env[1], ev->mh, msize); | 443 | GNUNET_memcpy (&env[1], ev->mh, msize); |
442 | GNUNET_MQ_send(mq, env); | 444 | GNUNET_MQ_send (mq, env); |
443 | } | 445 | } |
444 | 446 | ||
445 | 447 | ||
@@ -451,7 +453,7 @@ GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq, | |||
451 | * @param cls message queue to send the next message with | 453 | * @param cls message queue to send the next message with |
452 | */ | 454 | */ |
453 | static void | 455 | static void |
454 | impl_send_continue(void *cls) | 456 | impl_send_continue (void *cls) |
455 | { | 457 | { |
456 | struct GNUNET_MQ_Handle *mq = cls; | 458 | struct GNUNET_MQ_Handle *mq = cls; |
457 | 459 | ||
@@ -461,15 +463,15 @@ impl_send_continue(void *cls) | |||
461 | if (NULL == mq->envelope_head) | 463 | if (NULL == mq->envelope_head) |
462 | return; | 464 | return; |
463 | mq->current_envelope = mq->envelope_head; | 465 | mq->current_envelope = mq->envelope_head; |
464 | GNUNET_CONTAINER_DLL_remove(mq->envelope_head, | 466 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
465 | mq->envelope_tail, | 467 | mq->envelope_tail, |
466 | mq->current_envelope); | 468 | mq->current_envelope); |
467 | 469 | ||
468 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 470 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
469 | "sending message of type %u from queue\n", | 471 | "sending message of type %u from queue\n", |
470 | ntohs(mq->current_envelope->mh->type)); | 472 | ntohs (mq->current_envelope->mh->type)); |
471 | 473 | ||
472 | mq->send_impl(mq, mq->current_envelope->mh, mq->impl_state); | 474 | mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); |
473 | } | 475 | } |
474 | 476 | ||
475 | 477 | ||
@@ -481,25 +483,25 @@ impl_send_continue(void *cls) | |||
481 | * @param mq message queue to send the next message with | 483 | * @param mq message queue to send the next message with |
482 | */ | 484 | */ |
483 | void | 485 | void |
484 | GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq) | 486 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) |
485 | { | 487 | { |
486 | struct GNUNET_MQ_Envelope *current_envelope; | 488 | struct GNUNET_MQ_Envelope *current_envelope; |
487 | GNUNET_SCHEDULER_TaskCallback cb; | 489 | GNUNET_SCHEDULER_TaskCallback cb; |
488 | 490 | ||
489 | GNUNET_assert(0 < mq->queue_length); | 491 | GNUNET_assert (0 < mq->queue_length); |
490 | mq->queue_length--; | 492 | mq->queue_length--; |
491 | mq->in_flight = GNUNET_NO; | 493 | mq->in_flight = GNUNET_NO; |
492 | current_envelope = mq->current_envelope; | 494 | current_envelope = mq->current_envelope; |
493 | current_envelope->parent_queue = NULL; | 495 | current_envelope->parent_queue = NULL; |
494 | mq->current_envelope = NULL; | 496 | mq->current_envelope = NULL; |
495 | GNUNET_assert(NULL == mq->send_task); | 497 | GNUNET_assert (NULL == mq->send_task); |
496 | mq->send_task = GNUNET_SCHEDULER_add_now(&impl_send_continue, mq); | 498 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq); |
497 | if (NULL != (cb = current_envelope->sent_cb)) | 499 | if (NULL != (cb = current_envelope->sent_cb)) |
498 | { | 500 | { |
499 | current_envelope->sent_cb = NULL; | 501 | current_envelope->sent_cb = NULL; |
500 | cb(current_envelope->sent_cls); | 502 | cb (current_envelope->sent_cls); |
501 | } | 503 | } |
502 | GNUNET_free(current_envelope); | 504 | GNUNET_free (current_envelope); |
503 | } | 505 | } |
504 | 506 | ||
505 | 507 | ||
@@ -514,7 +516,7 @@ GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq) | |||
514 | * @param mq message queue to send the next message with | 516 | * @param mq message queue to send the next message with |
515 | */ | 517 | */ |
516 | void | 518 | void |
517 | GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq) | 519 | GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) |
518 | { | 520 | { |
519 | struct GNUNET_MQ_Envelope *current_envelope; | 521 | struct GNUNET_MQ_Envelope *current_envelope; |
520 | GNUNET_SCHEDULER_TaskCallback cb; | 522 | GNUNET_SCHEDULER_TaskCallback cb; |
@@ -523,14 +525,14 @@ GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq) | |||
523 | /* call is only valid if we're actually currently sending | 525 | /* call is only valid if we're actually currently sending |
524 | * a message */ | 526 | * a message */ |
525 | current_envelope = mq->current_envelope; | 527 | current_envelope = mq->current_envelope; |
526 | GNUNET_assert(NULL != current_envelope); | 528 | GNUNET_assert (NULL != current_envelope); |
527 | /* can't call cancel from now on anymore */ | 529 | /* can't call cancel from now on anymore */ |
528 | current_envelope->parent_queue = NULL; | 530 | current_envelope->parent_queue = NULL; |
529 | if (NULL != (cb = current_envelope->sent_cb)) | 531 | if (NULL != (cb = current_envelope->sent_cb)) |
530 | { | 532 | { |
531 | current_envelope->sent_cb = NULL; | 533 | current_envelope->sent_cb = NULL; |
532 | cb(current_envelope->sent_cls); | 534 | cb (current_envelope->sent_cls); |
533 | } | 535 | } |
534 | } | 536 | } |
535 | 537 | ||
536 | 538 | ||
@@ -547,21 +549,21 @@ GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq) | |||
547 | * @return a new message queue | 549 | * @return a new message queue |
548 | */ | 550 | */ |
549 | struct GNUNET_MQ_Handle * | 551 | struct GNUNET_MQ_Handle * |
550 | GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send, | 552 | GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, |
551 | GNUNET_MQ_DestroyImpl destroy, | 553 | GNUNET_MQ_DestroyImpl destroy, |
552 | GNUNET_MQ_CancelImpl cancel, | 554 | GNUNET_MQ_CancelImpl cancel, |
553 | void *impl_state, | 555 | void *impl_state, |
554 | const struct GNUNET_MQ_MessageHandler *handlers, | 556 | const struct GNUNET_MQ_MessageHandler *handlers, |
555 | GNUNET_MQ_ErrorHandler error_handler, | 557 | GNUNET_MQ_ErrorHandler error_handler, |
556 | void *error_handler_cls) | 558 | void *error_handler_cls) |
557 | { | 559 | { |
558 | struct GNUNET_MQ_Handle *mq; | 560 | struct GNUNET_MQ_Handle *mq; |
559 | 561 | ||
560 | mq = GNUNET_new(struct GNUNET_MQ_Handle); | 562 | mq = GNUNET_new (struct GNUNET_MQ_Handle); |
561 | mq->send_impl = send; | 563 | mq->send_impl = send; |
562 | mq->destroy_impl = destroy; | 564 | mq->destroy_impl = destroy; |
563 | mq->cancel_impl = cancel; | 565 | mq->cancel_impl = cancel; |
564 | mq->handlers = GNUNET_MQ_copy_handlers(handlers); | 566 | mq->handlers = GNUNET_MQ_copy_handlers (handlers); |
565 | mq->error_handler = error_handler; | 567 | mq->error_handler = error_handler; |
566 | mq->error_handler_cls = error_handler_cls; | 568 | mq->error_handler_cls = error_handler_cls; |
567 | mq->impl_state = impl_state; | 569 | mq->impl_state = impl_state; |
@@ -578,7 +580,7 @@ GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send, | |||
578 | * @param handlers_cls new closure to use | 580 | * @param handlers_cls new closure to use |
579 | */ | 581 | */ |
580 | void | 582 | void |
581 | GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls) | 583 | GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls) |
582 | { | 584 | { |
583 | if (NULL == mq->handlers) | 585 | if (NULL == mq->handlers) |
584 | return; | 586 | return; |
@@ -597,10 +599,10 @@ GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls) | |||
597 | * @return message to send, never NULL | 599 | * @return message to send, never NULL |
598 | */ | 600 | */ |
599 | const struct GNUNET_MessageHeader * | 601 | const struct GNUNET_MessageHeader * |
600 | GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq) | 602 | GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) |
601 | { | 603 | { |
602 | GNUNET_assert(NULL != mq->current_envelope); | 604 | GNUNET_assert (NULL != mq->current_envelope); |
603 | GNUNET_assert(NULL != mq->current_envelope->mh); | 605 | GNUNET_assert (NULL != mq->current_envelope->mh); |
604 | return mq->current_envelope->mh; | 606 | return mq->current_envelope->mh; |
605 | } | 607 | } |
606 | 608 | ||
@@ -620,21 +622,21 @@ GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq) | |||
620 | * @return message to send, never NULL | 622 | * @return message to send, never NULL |
621 | */ | 623 | */ |
622 | void * | 624 | void * |
623 | GNUNET_MQ_impl_state(struct GNUNET_MQ_Handle *mq) | 625 | GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) |
624 | { | 626 | { |
625 | return mq->impl_state; | 627 | return mq->impl_state; |
626 | } | 628 | } |
627 | 629 | ||
628 | 630 | ||
629 | struct GNUNET_MQ_Envelope * | 631 | struct GNUNET_MQ_Envelope * |
630 | GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | 632 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) |
631 | { | 633 | { |
632 | struct GNUNET_MQ_Envelope *ev; | 634 | struct GNUNET_MQ_Envelope *ev; |
633 | 635 | ||
634 | ev = GNUNET_malloc(size + sizeof(struct GNUNET_MQ_Envelope)); | 636 | ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope)); |
635 | ev->mh = (struct GNUNET_MessageHeader *)&ev[1]; | 637 | ev->mh = (struct GNUNET_MessageHeader *) &ev[1]; |
636 | ev->mh->size = htons(size); | 638 | ev->mh->size = htons (size); |
637 | ev->mh->type = htons(type); | 639 | ev->mh->type = htons (type); |
638 | if (NULL != mhp) | 640 | if (NULL != mhp) |
639 | *mhp = ev->mh; | 641 | *mhp = ev->mh; |
640 | return ev; | 642 | return ev; |
@@ -648,14 +650,14 @@ GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
648 | * @return envelope containing @a hdr | 650 | * @return envelope containing @a hdr |
649 | */ | 651 | */ |
650 | struct GNUNET_MQ_Envelope * | 652 | struct GNUNET_MQ_Envelope * |
651 | GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr) | 653 | GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr) |
652 | { | 654 | { |
653 | struct GNUNET_MQ_Envelope *mqm; | 655 | struct GNUNET_MQ_Envelope *mqm; |
654 | uint16_t size = ntohs(hdr->size); | 656 | uint16_t size = ntohs (hdr->size); |
655 | 657 | ||
656 | mqm = GNUNET_malloc(sizeof(*mqm) + size); | 658 | mqm = GNUNET_malloc (sizeof(*mqm) + size); |
657 | mqm->mh = (struct GNUNET_MessageHeader *)&mqm[1]; | 659 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; |
658 | GNUNET_memcpy(mqm->mh, hdr, size); | 660 | GNUNET_memcpy (mqm->mh, hdr, size); |
659 | return mqm; | 661 | return mqm; |
660 | } | 662 | } |
661 | 663 | ||
@@ -670,27 +672,27 @@ GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr) | |||
670 | * @param nested_mh the message to append to the message after base_size | 672 | * @param nested_mh the message to append to the message after base_size |
671 | */ | 673 | */ |
672 | struct GNUNET_MQ_Envelope * | 674 | struct GNUNET_MQ_Envelope * |
673 | GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp, | 675 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, |
674 | uint16_t base_size, | 676 | uint16_t base_size, |
675 | uint16_t type, | 677 | uint16_t type, |
676 | const struct GNUNET_MessageHeader *nested_mh) | 678 | const struct GNUNET_MessageHeader *nested_mh) |
677 | { | 679 | { |
678 | struct GNUNET_MQ_Envelope *mqm; | 680 | struct GNUNET_MQ_Envelope *mqm; |
679 | uint16_t size; | 681 | uint16_t size; |
680 | 682 | ||
681 | if (NULL == nested_mh) | 683 | if (NULL == nested_mh) |
682 | return GNUNET_MQ_msg_(mhp, base_size, type); | 684 | return GNUNET_MQ_msg_ (mhp, base_size, type); |
683 | 685 | ||
684 | size = base_size + ntohs(nested_mh->size); | 686 | size = base_size + ntohs (nested_mh->size); |
685 | 687 | ||
686 | /* check for uint16_t overflow */ | 688 | /* check for uint16_t overflow */ |
687 | if (size < base_size) | 689 | if (size < base_size) |
688 | return NULL; | 690 | return NULL; |
689 | 691 | ||
690 | mqm = GNUNET_MQ_msg_(mhp, size, type); | 692 | mqm = GNUNET_MQ_msg_ (mhp, size, type); |
691 | GNUNET_memcpy((char *)mqm->mh + base_size, | 693 | GNUNET_memcpy ((char *) mqm->mh + base_size, |
692 | nested_mh, | 694 | nested_mh, |
693 | ntohs(nested_mh->size)); | 695 | ntohs (nested_mh->size)); |
694 | 696 | ||
695 | return mqm; | 697 | return mqm; |
696 | } | 698 | } |
@@ -703,22 +705,22 @@ GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp, | |||
703 | * @param assoc_data to associate | 705 | * @param assoc_data to associate |
704 | */ | 706 | */ |
705 | uint32_t | 707 | uint32_t |
706 | GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data) | 708 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data) |
707 | { | 709 | { |
708 | uint32_t id; | 710 | uint32_t id; |
709 | 711 | ||
710 | if (NULL == mq->assoc_map) | 712 | if (NULL == mq->assoc_map) |
711 | { | 713 | { |
712 | mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create(8); | 714 | mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); |
713 | mq->assoc_id = 1; | 715 | mq->assoc_id = 1; |
714 | } | 716 | } |
715 | id = mq->assoc_id++; | 717 | id = mq->assoc_id++; |
716 | GNUNET_assert(GNUNET_OK == | 718 | GNUNET_assert (GNUNET_OK == |
717 | GNUNET_CONTAINER_multihashmap32_put( | 719 | GNUNET_CONTAINER_multihashmap32_put ( |
718 | mq->assoc_map, | 720 | mq->assoc_map, |
719 | id, | 721 | id, |
720 | assoc_data, | 722 | assoc_data, |
721 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 723 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
722 | return id; | 724 | return id; |
723 | } | 725 | } |
724 | 726 | ||
@@ -731,11 +733,11 @@ GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data) | |||
731 | * @return the associated data | 733 | * @return the associated data |
732 | */ | 734 | */ |
733 | void * | 735 | void * |
734 | GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id) | 736 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) |
735 | { | 737 | { |
736 | if (NULL == mq->assoc_map) | 738 | if (NULL == mq->assoc_map) |
737 | return NULL; | 739 | return NULL; |
738 | return GNUNET_CONTAINER_multihashmap32_get(mq->assoc_map, request_id); | 740 | return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); |
739 | } | 741 | } |
740 | 742 | ||
741 | 743 | ||
@@ -747,14 +749,14 @@ GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id) | |||
747 | * @return the associated data | 749 | * @return the associated data |
748 | */ | 750 | */ |
749 | void * | 751 | void * |
750 | GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id) | 752 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) |
751 | { | 753 | { |
752 | void *val; | 754 | void *val; |
753 | 755 | ||
754 | if (NULL == mq->assoc_map) | 756 | if (NULL == mq->assoc_map) |
755 | return NULL; | 757 | return NULL; |
756 | val = GNUNET_CONTAINER_multihashmap32_get(mq->assoc_map, request_id); | 758 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); |
757 | GNUNET_CONTAINER_multihashmap32_remove_all(mq->assoc_map, request_id); | 759 | GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id); |
758 | return val; | 760 | return val; |
759 | } | 761 | } |
760 | 762 | ||
@@ -769,12 +771,12 @@ GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id) | |||
769 | * @param cb_cls closure for the callback | 771 | * @param cb_cls closure for the callback |
770 | */ | 772 | */ |
771 | void | 773 | void |
772 | GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, | 774 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, |
773 | GNUNET_SCHEDULER_TaskCallback cb, | 775 | GNUNET_SCHEDULER_TaskCallback cb, |
774 | void *cb_cls) | 776 | void *cb_cls) |
775 | { | 777 | { |
776 | /* allow setting *OR* clearing callback */ | 778 | /* allow setting *OR* clearing callback */ |
777 | GNUNET_assert((NULL == ev->sent_cb) || (NULL == cb)); | 779 | GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb)); |
778 | ev->sent_cb = cb; | 780 | ev->sent_cb = cb; |
779 | ev->sent_cls = cb_cls; | 781 | ev->sent_cls = cb_cls; |
780 | } | 782 | } |
@@ -784,7 +786,8 @@ GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, | |||
784 | * Handle we return for callbacks registered to be | 786 | * Handle we return for callbacks registered to be |
785 | * notified when #GNUNET_MQ_destroy() is called on a queue. | 787 | * notified when #GNUNET_MQ_destroy() is called on a queue. |
786 | */ | 788 | */ |
787 | struct GNUNET_MQ_DestroyNotificationHandle { | 789 | struct GNUNET_MQ_DestroyNotificationHandle |
790 | { | ||
788 | /** | 791 | /** |
789 | * Kept in a DLL. | 792 | * Kept in a DLL. |
790 | */ | 793 | */ |
@@ -818,86 +821,86 @@ struct GNUNET_MQ_DestroyNotificationHandle { | |||
818 | * @param mq message queue to destroy | 821 | * @param mq message queue to destroy |
819 | */ | 822 | */ |
820 | void | 823 | void |
821 | GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq) | 824 | GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) |
822 | { | 825 | { |
823 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; | 826 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; |
824 | 827 | ||
825 | if (NULL != mq->destroy_impl) | 828 | if (NULL != mq->destroy_impl) |
826 | { | 829 | { |
827 | mq->destroy_impl(mq, mq->impl_state); | 830 | mq->destroy_impl (mq, mq->impl_state); |
828 | } | 831 | } |
829 | if (NULL != mq->send_task) | 832 | if (NULL != mq->send_task) |
830 | { | 833 | { |
831 | GNUNET_SCHEDULER_cancel(mq->send_task); | 834 | GNUNET_SCHEDULER_cancel (mq->send_task); |
832 | mq->send_task = NULL; | 835 | mq->send_task = NULL; |
833 | } | 836 | } |
834 | while (NULL != mq->envelope_head) | 837 | while (NULL != mq->envelope_head) |
835 | { | 838 | { |
836 | struct GNUNET_MQ_Envelope *ev; | 839 | struct GNUNET_MQ_Envelope *ev; |
837 | 840 | ||
838 | ev = mq->envelope_head; | 841 | ev = mq->envelope_head; |
839 | ev->parent_queue = NULL; | 842 | ev->parent_queue = NULL; |
840 | GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, ev); | 843 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); |
841 | GNUNET_assert(0 < mq->queue_length); | 844 | GNUNET_assert (0 < mq->queue_length); |
842 | mq->queue_length--; | 845 | mq->queue_length--; |
843 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 846 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
844 | "MQ destroy drops message of type %u\n", | 847 | "MQ destroy drops message of type %u\n", |
845 | ntohs(ev->mh->type)); | 848 | ntohs (ev->mh->type)); |
846 | GNUNET_MQ_discard(ev); | 849 | GNUNET_MQ_discard (ev); |
847 | } | 850 | } |
848 | if (NULL != mq->current_envelope) | 851 | if (NULL != mq->current_envelope) |
849 | { | 852 | { |
850 | /* we can only discard envelopes that | 853 | /* we can only discard envelopes that |
851 | * are not queued! */ | 854 | * are not queued! */ |
852 | mq->current_envelope->parent_queue = NULL; | 855 | mq->current_envelope->parent_queue = NULL; |
853 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 856 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
854 | "MQ destroy drops current message of type %u\n", | 857 | "MQ destroy drops current message of type %u\n", |
855 | ntohs(mq->current_envelope->mh->type)); | 858 | ntohs (mq->current_envelope->mh->type)); |
856 | GNUNET_MQ_discard(mq->current_envelope); | 859 | GNUNET_MQ_discard (mq->current_envelope); |
857 | mq->current_envelope = NULL; | 860 | mq->current_envelope = NULL; |
858 | GNUNET_assert(0 < mq->queue_length); | 861 | GNUNET_assert (0 < mq->queue_length); |
859 | mq->queue_length--; | 862 | mq->queue_length--; |
860 | } | 863 | } |
861 | GNUNET_assert(0 == mq->queue_length); | 864 | GNUNET_assert (0 == mq->queue_length); |
862 | while (NULL != (dnh = mq->dnh_head)) | 865 | while (NULL != (dnh = mq->dnh_head)) |
863 | { | 866 | { |
864 | dnh->cb(dnh->cb_cls); | 867 | dnh->cb (dnh->cb_cls); |
865 | GNUNET_MQ_destroy_notify_cancel(dnh); | 868 | GNUNET_MQ_destroy_notify_cancel (dnh); |
866 | } | 869 | } |
867 | if (NULL != mq->assoc_map) | 870 | if (NULL != mq->assoc_map) |
868 | { | 871 | { |
869 | GNUNET_CONTAINER_multihashmap32_destroy(mq->assoc_map); | 872 | GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map); |
870 | mq->assoc_map = NULL; | 873 | mq->assoc_map = NULL; |
871 | } | 874 | } |
872 | GNUNET_free_non_null(mq->handlers); | 875 | GNUNET_free_non_null (mq->handlers); |
873 | GNUNET_free(mq); | 876 | GNUNET_free (mq); |
874 | } | 877 | } |
875 | 878 | ||
876 | 879 | ||
877 | const struct GNUNET_MessageHeader * | 880 | const struct GNUNET_MessageHeader * |
878 | GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh, | 881 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, |
879 | uint16_t base_size) | 882 | uint16_t base_size) |
880 | { | 883 | { |
881 | uint16_t whole_size; | 884 | uint16_t whole_size; |
882 | uint16_t nested_size; | 885 | uint16_t nested_size; |
883 | const struct GNUNET_MessageHeader *nested_msg; | 886 | const struct GNUNET_MessageHeader *nested_msg; |
884 | 887 | ||
885 | whole_size = ntohs(mh->size); | 888 | whole_size = ntohs (mh->size); |
886 | GNUNET_assert(whole_size >= base_size); | 889 | GNUNET_assert (whole_size >= base_size); |
887 | nested_size = whole_size - base_size; | 890 | nested_size = whole_size - base_size; |
888 | if (0 == nested_size) | 891 | if (0 == nested_size) |
889 | return NULL; | 892 | return NULL; |
890 | if (nested_size < sizeof(struct GNUNET_MessageHeader)) | 893 | if (nested_size < sizeof(struct GNUNET_MessageHeader)) |
891 | { | 894 | { |
892 | GNUNET_break_op(0); | 895 | GNUNET_break_op (0); |
893 | return NULL; | 896 | return NULL; |
894 | } | 897 | } |
895 | nested_msg = (const struct GNUNET_MessageHeader *)((char *)mh + base_size); | 898 | nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size); |
896 | if (ntohs(nested_msg->size) != nested_size) | 899 | if (ntohs (nested_msg->size) != nested_size) |
897 | { | 900 | { |
898 | GNUNET_break_op(0); | 901 | GNUNET_break_op (0); |
899 | return NULL; | 902 | return NULL; |
900 | } | 903 | } |
901 | return nested_msg; | 904 | return nested_msg; |
902 | } | 905 | } |
903 | 906 | ||
@@ -910,53 +913,53 @@ GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh, | |||
910 | * @param ev queued envelope to cancel | 913 | * @param ev queued envelope to cancel |
911 | */ | 914 | */ |
912 | void | 915 | void |
913 | GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev) | 916 | GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) |
914 | { | 917 | { |
915 | struct GNUNET_MQ_Handle *mq = ev->parent_queue; | 918 | struct GNUNET_MQ_Handle *mq = ev->parent_queue; |
916 | 919 | ||
917 | GNUNET_assert(NULL != mq); | 920 | GNUNET_assert (NULL != mq); |
918 | GNUNET_assert(NULL != mq->cancel_impl); | 921 | GNUNET_assert (NULL != mq->cancel_impl); |
919 | 922 | ||
920 | mq->evacuate_called = GNUNET_NO; | 923 | mq->evacuate_called = GNUNET_NO; |
921 | 924 | ||
922 | if (mq->current_envelope == ev) | 925 | if (mq->current_envelope == ev) |
926 | { | ||
927 | /* complex case, we already started with transmitting | ||
928 | the message using the callbacks. */ | ||
929 | GNUNET_assert (GNUNET_NO == mq->in_flight); | ||
930 | GNUNET_assert (0 < mq->queue_length); | ||
931 | mq->queue_length--; | ||
932 | mq->cancel_impl (mq, mq->impl_state); | ||
933 | /* continue sending the next message, if any */ | ||
934 | mq->current_envelope = mq->envelope_head; | ||
935 | if (NULL != mq->current_envelope) | ||
923 | { | 936 | { |
924 | /* complex case, we already started with transmitting | 937 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
925 | the message using the callbacks. */ | 938 | mq->envelope_tail, |
926 | GNUNET_assert(GNUNET_NO == mq->in_flight); | 939 | mq->current_envelope); |
927 | GNUNET_assert(0 < mq->queue_length); | 940 | |
928 | mq->queue_length--; | 941 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
929 | mq->cancel_impl(mq, mq->impl_state); | 942 | "sending canceled message of type %u queue\n", |
930 | /* continue sending the next message, if any */ | 943 | ntohs (ev->mh->type)); |
931 | mq->current_envelope = mq->envelope_head; | 944 | |
932 | if (NULL != mq->current_envelope) | 945 | mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); |
933 | { | ||
934 | GNUNET_CONTAINER_DLL_remove(mq->envelope_head, | ||
935 | mq->envelope_tail, | ||
936 | mq->current_envelope); | ||
937 | |||
938 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
939 | "sending canceled message of type %u queue\n", | ||
940 | ntohs(ev->mh->type)); | ||
941 | |||
942 | mq->send_impl(mq, mq->current_envelope->mh, mq->impl_state); | ||
943 | } | ||
944 | } | 946 | } |
947 | } | ||
945 | else | 948 | else |
946 | { | 949 | { |
947 | /* simple case, message is still waiting in the queue */ | 950 | /* simple case, message is still waiting in the queue */ |
948 | GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, ev); | 951 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); |
949 | GNUNET_assert(0 < mq->queue_length); | 952 | GNUNET_assert (0 < mq->queue_length); |
950 | mq->queue_length--; | 953 | mq->queue_length--; |
951 | } | 954 | } |
952 | 955 | ||
953 | if (GNUNET_YES != mq->evacuate_called) | 956 | if (GNUNET_YES != mq->evacuate_called) |
954 | { | 957 | { |
955 | ev->parent_queue = NULL; | 958 | ev->parent_queue = NULL; |
956 | ev->mh = NULL; | 959 | ev->mh = NULL; |
957 | /* also frees ev */ | 960 | /* also frees ev */ |
958 | GNUNET_free(ev); | 961 | GNUNET_free (ev); |
959 | } | 962 | } |
960 | } | 963 | } |
961 | 964 | ||
962 | 965 | ||
@@ -968,7 +971,7 @@ GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev) | |||
968 | * @return the current envelope | 971 | * @return the current envelope |
969 | */ | 972 | */ |
970 | struct GNUNET_MQ_Envelope * | 973 | struct GNUNET_MQ_Envelope * |
971 | GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq) | 974 | GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq) |
972 | { | 975 | { |
973 | return mq->current_envelope; | 976 | return mq->current_envelope; |
974 | } | 977 | } |
@@ -981,7 +984,7 @@ GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq) | |||
981 | * @return the last envelope in the queue | 984 | * @return the last envelope in the queue |
982 | */ | 985 | */ |
983 | struct GNUNET_MQ_Envelope * | 986 | struct GNUNET_MQ_Envelope * |
984 | GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq) | 987 | GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq) |
985 | { | 988 | { |
986 | if (NULL != mq->envelope_tail) | 989 | if (NULL != mq->envelope_tail) |
987 | return mq->envelope_tail; | 990 | return mq->envelope_tail; |
@@ -999,8 +1002,8 @@ GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq) | |||
999 | * @param pp priorities and preferences to apply | 1002 | * @param pp priorities and preferences to apply |
1000 | */ | 1003 | */ |
1001 | void | 1004 | void |
1002 | GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env, | 1005 | GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, |
1003 | enum GNUNET_MQ_PriorityPreferences pp) | 1006 | enum GNUNET_MQ_PriorityPreferences pp) |
1004 | { | 1007 | { |
1005 | env->priority = pp; | 1008 | env->priority = pp; |
1006 | env->have_custom_options = GNUNET_YES; | 1009 | env->have_custom_options = GNUNET_YES; |
@@ -1014,7 +1017,7 @@ GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env, | |||
1014 | * @return priorities and preferences to apply for @a env | 1017 | * @return priorities and preferences to apply for @a env |
1015 | */ | 1018 | */ |
1016 | enum GNUNET_MQ_PriorityPreferences | 1019 | enum GNUNET_MQ_PriorityPreferences |
1017 | GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env) | 1020 | GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env) |
1018 | { | 1021 | { |
1019 | struct GNUNET_MQ_Handle *mq = env->parent_queue; | 1022 | struct GNUNET_MQ_Handle *mq = env->parent_queue; |
1020 | 1023 | ||
@@ -1035,12 +1038,12 @@ GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env) | |||
1035 | * @return combined priority and preferences to use | 1038 | * @return combined priority and preferences to use |
1036 | */ | 1039 | */ |
1037 | enum GNUNET_MQ_PriorityPreferences | 1040 | enum GNUNET_MQ_PriorityPreferences |
1038 | GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1, | 1041 | GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1, |
1039 | enum GNUNET_MQ_PriorityPreferences p2) | 1042 | enum GNUNET_MQ_PriorityPreferences p2) |
1040 | { | 1043 | { |
1041 | enum GNUNET_MQ_PriorityPreferences ret; | 1044 | enum GNUNET_MQ_PriorityPreferences ret; |
1042 | 1045 | ||
1043 | ret = GNUNET_MAX(p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK); | 1046 | ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK); |
1044 | ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE)); | 1047 | ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE)); |
1045 | ret |= | 1048 | ret |= |
1046 | ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY)); | 1049 | ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY)); |
@@ -1060,8 +1063,8 @@ GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1, | |||
1060 | * @param pp priorities and preferences to apply | 1063 | * @param pp priorities and preferences to apply |
1061 | */ | 1064 | */ |
1062 | void | 1065 | void |
1063 | GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq, | 1066 | GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, |
1064 | enum GNUNET_MQ_PriorityPreferences pp) | 1067 | enum GNUNET_MQ_PriorityPreferences pp) |
1065 | { | 1068 | { |
1066 | mq->priority = pp; | 1069 | mq->priority = pp; |
1067 | } | 1070 | } |
@@ -1074,7 +1077,7 @@ GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq, | |||
1074 | * @return message contained in the envelope | 1077 | * @return message contained in the envelope |
1075 | */ | 1078 | */ |
1076 | const struct GNUNET_MessageHeader * | 1079 | const struct GNUNET_MessageHeader * |
1077 | GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env) | 1080 | GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env) |
1078 | { | 1081 | { |
1079 | return env->mh; | 1082 | return env->mh; |
1080 | } | 1083 | } |
@@ -1087,7 +1090,7 @@ GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env) | |||
1087 | * @return next one, or NULL | 1090 | * @return next one, or NULL |
1088 | */ | 1091 | */ |
1089 | const struct GNUNET_MQ_Envelope * | 1092 | const struct GNUNET_MQ_Envelope * |
1090 | GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env) | 1093 | GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env) |
1091 | { | 1094 | { |
1092 | return env->next; | 1095 | return env->next; |
1093 | } | 1096 | } |
@@ -1103,17 +1106,17 @@ GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env) | |||
1103 | * @return handle for #GNUNET_MQ_destroy_notify_cancel(). | 1106 | * @return handle for #GNUNET_MQ_destroy_notify_cancel(). |
1104 | */ | 1107 | */ |
1105 | struct GNUNET_MQ_DestroyNotificationHandle * | 1108 | struct GNUNET_MQ_DestroyNotificationHandle * |
1106 | GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq, | 1109 | GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, |
1107 | GNUNET_SCHEDULER_TaskCallback cb, | 1110 | GNUNET_SCHEDULER_TaskCallback cb, |
1108 | void *cb_cls) | 1111 | void *cb_cls) |
1109 | { | 1112 | { |
1110 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; | 1113 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; |
1111 | 1114 | ||
1112 | dnh = GNUNET_new(struct GNUNET_MQ_DestroyNotificationHandle); | 1115 | dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle); |
1113 | dnh->mq = mq; | 1116 | dnh->mq = mq; |
1114 | dnh->cb = cb; | 1117 | dnh->cb = cb; |
1115 | dnh->cb_cls = cb_cls; | 1118 | dnh->cb_cls = cb_cls; |
1116 | GNUNET_CONTAINER_DLL_insert(mq->dnh_head, mq->dnh_tail, dnh); | 1119 | GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh); |
1117 | return dnh; | 1120 | return dnh; |
1118 | } | 1121 | } |
1119 | 1122 | ||
@@ -1124,13 +1127,13 @@ GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq, | |||
1124 | * @param dnh handle for registration to cancel | 1127 | * @param dnh handle for registration to cancel |
1125 | */ | 1128 | */ |
1126 | void | 1129 | void |
1127 | GNUNET_MQ_destroy_notify_cancel( | 1130 | GNUNET_MQ_destroy_notify_cancel ( |
1128 | struct GNUNET_MQ_DestroyNotificationHandle *dnh) | 1131 | struct GNUNET_MQ_DestroyNotificationHandle *dnh) |
1129 | { | 1132 | { |
1130 | struct GNUNET_MQ_Handle *mq = dnh->mq; | 1133 | struct GNUNET_MQ_Handle *mq = dnh->mq; |
1131 | 1134 | ||
1132 | GNUNET_CONTAINER_DLL_remove(mq->dnh_head, mq->dnh_tail, dnh); | 1135 | GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh); |
1133 | GNUNET_free(dnh); | 1136 | GNUNET_free (dnh); |
1134 | } | 1137 | } |
1135 | 1138 | ||
1136 | 1139 | ||
@@ -1147,11 +1150,11 @@ GNUNET_MQ_destroy_notify_cancel( | |||
1147 | * @param[in|out] env element to insert at the tail | 1150 | * @param[in|out] env element to insert at the tail |
1148 | */ | 1151 | */ |
1149 | void | 1152 | void |
1150 | GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head, | 1153 | GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head, |
1151 | struct GNUNET_MQ_Envelope **env_tail, | 1154 | struct GNUNET_MQ_Envelope **env_tail, |
1152 | struct GNUNET_MQ_Envelope *env) | 1155 | struct GNUNET_MQ_Envelope *env) |
1153 | { | 1156 | { |
1154 | GNUNET_CONTAINER_DLL_insert(*env_head, *env_tail, env); | 1157 | GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env); |
1155 | } | 1158 | } |
1156 | 1159 | ||
1157 | 1160 | ||
@@ -1168,11 +1171,11 @@ GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head, | |||
1168 | * @param[in|out] env element to insert at the tail | 1171 | * @param[in|out] env element to insert at the tail |
1169 | */ | 1172 | */ |
1170 | void | 1173 | void |
1171 | GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head, | 1174 | GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head, |
1172 | struct GNUNET_MQ_Envelope **env_tail, | 1175 | struct GNUNET_MQ_Envelope **env_tail, |
1173 | struct GNUNET_MQ_Envelope *env) | 1176 | struct GNUNET_MQ_Envelope *env) |
1174 | { | 1177 | { |
1175 | GNUNET_CONTAINER_DLL_insert_tail(*env_head, *env_tail, env); | 1178 | GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env); |
1176 | } | 1179 | } |
1177 | 1180 | ||
1178 | 1181 | ||
@@ -1189,11 +1192,11 @@ GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head, | |||
1189 | * @param[in|out] env element to remove from the DLL | 1192 | * @param[in|out] env element to remove from the DLL |
1190 | */ | 1193 | */ |
1191 | void | 1194 | void |
1192 | GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head, | 1195 | GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head, |
1193 | struct GNUNET_MQ_Envelope **env_tail, | 1196 | struct GNUNET_MQ_Envelope **env_tail, |
1194 | struct GNUNET_MQ_Envelope *env) | 1197 | struct GNUNET_MQ_Envelope *env) |
1195 | { | 1198 | { |
1196 | GNUNET_CONTAINER_DLL_remove(*env_head, *env_tail, env); | 1199 | GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env); |
1197 | } | 1200 | } |
1198 | 1201 | ||
1199 | 1202 | ||
@@ -1208,7 +1211,7 @@ GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head, | |||
1208 | * Needs to be freed with #GNUNET_free. | 1211 | * Needs to be freed with #GNUNET_free. |
1209 | */ | 1212 | */ |
1210 | struct GNUNET_MQ_MessageHandler * | 1213 | struct GNUNET_MQ_MessageHandler * |
1211 | GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers) | 1214 | GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers) |
1212 | { | 1215 | { |
1213 | struct GNUNET_MQ_MessageHandler *copy; | 1216 | struct GNUNET_MQ_MessageHandler *copy; |
1214 | unsigned int count; | 1217 | unsigned int count; |
@@ -1216,11 +1219,11 @@ GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers) | |||
1216 | if (NULL == handlers) | 1219 | if (NULL == handlers) |
1217 | return NULL; | 1220 | return NULL; |
1218 | 1221 | ||
1219 | count = GNUNET_MQ_count_handlers(handlers); | 1222 | count = GNUNET_MQ_count_handlers (handlers); |
1220 | copy = GNUNET_new_array(count + 1, struct GNUNET_MQ_MessageHandler); | 1223 | copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler); |
1221 | GNUNET_memcpy(copy, | 1224 | GNUNET_memcpy (copy, |
1222 | handlers, | 1225 | handlers, |
1223 | count * sizeof(struct GNUNET_MQ_MessageHandler)); | 1226 | count * sizeof(struct GNUNET_MQ_MessageHandler)); |
1224 | return copy; | 1227 | return copy; |
1225 | } | 1228 | } |
1226 | 1229 | ||
@@ -1238,20 +1241,20 @@ GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers) | |||
1238 | * Needs to be freed with #GNUNET_free. | 1241 | * Needs to be freed with #GNUNET_free. |
1239 | */ | 1242 | */ |
1240 | struct GNUNET_MQ_MessageHandler * | 1243 | struct GNUNET_MQ_MessageHandler * |
1241 | GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers, | 1244 | GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers, |
1242 | GNUNET_MQ_MessageCallback agpl_handler, | 1245 | GNUNET_MQ_MessageCallback agpl_handler, |
1243 | void *agpl_cls) | 1246 | void *agpl_cls) |
1244 | { | 1247 | { |
1245 | struct GNUNET_MQ_MessageHandler *copy; | 1248 | struct GNUNET_MQ_MessageHandler *copy; |
1246 | unsigned int count; | 1249 | unsigned int count; |
1247 | 1250 | ||
1248 | if (NULL == handlers) | 1251 | if (NULL == handlers) |
1249 | return NULL; | 1252 | return NULL; |
1250 | count = GNUNET_MQ_count_handlers(handlers); | 1253 | count = GNUNET_MQ_count_handlers (handlers); |
1251 | copy = GNUNET_new_array(count + 2, struct GNUNET_MQ_MessageHandler); | 1254 | copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler); |
1252 | GNUNET_memcpy(copy, | 1255 | GNUNET_memcpy (copy, |
1253 | handlers, | 1256 | handlers, |
1254 | count * sizeof(struct GNUNET_MQ_MessageHandler)); | 1257 | count * sizeof(struct GNUNET_MQ_MessageHandler)); |
1255 | copy[count].mv = NULL; | 1258 | copy[count].mv = NULL; |
1256 | copy[count].cb = agpl_handler; | 1259 | copy[count].cb = agpl_handler; |
1257 | copy[count].cls = agpl_cls; | 1260 | copy[count].cls = agpl_cls; |
@@ -1268,7 +1271,7 @@ GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers, | |||
1268 | * @return The number of handlers in the array. | 1271 | * @return The number of handlers in the array. |
1269 | */ | 1272 | */ |
1270 | unsigned int | 1273 | unsigned int |
1271 | GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers) | 1274 | GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers) |
1272 | { | 1275 | { |
1273 | unsigned int i; | 1276 | unsigned int i; |
1274 | 1277 | ||
@@ -1289,22 +1292,22 @@ GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers) | |||
1289 | * @return a string or NULL if invalid | 1292 | * @return a string or NULL if invalid |
1290 | */ | 1293 | */ |
1291 | const char * | 1294 | const char * |
1292 | GNUNET_MQ_preference_to_string(enum GNUNET_MQ_PreferenceKind type) | 1295 | GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type) |
1293 | { | 1296 | { |
1294 | switch (type) | 1297 | switch (type) |
1295 | { | 1298 | { |
1296 | case GNUNET_MQ_PREFERENCE_NONE: | 1299 | case GNUNET_MQ_PREFERENCE_NONE: |
1297 | return "NONE"; | 1300 | return "NONE"; |
1298 | 1301 | ||
1299 | case GNUNET_MQ_PREFERENCE_BANDWIDTH: | 1302 | case GNUNET_MQ_PREFERENCE_BANDWIDTH: |
1300 | return "BANDWIDTH"; | 1303 | return "BANDWIDTH"; |
1301 | 1304 | ||
1302 | case GNUNET_MQ_PREFERENCE_LATENCY: | 1305 | case GNUNET_MQ_PREFERENCE_LATENCY: |
1303 | return "LATENCY"; | 1306 | return "LATENCY"; |
1304 | 1307 | ||
1305 | case GNUNET_MQ_PREFERENCE_RELIABILITY: | 1308 | case GNUNET_MQ_PREFERENCE_RELIABILITY: |
1306 | return "RELIABILITY"; | 1309 | return "RELIABILITY"; |
1307 | } | 1310 | } |
1308 | ; | 1311 | ; |
1309 | return NULL; | 1312 | return NULL; |
1310 | } | 1313 | } |