diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-05-15 10:48:55 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-05-15 10:48:55 +0000 |
commit | 6f54b50858457dfa2b5f0b519fbf230e1119c6b2 (patch) | |
tree | 5f84bfa599cb50522999cad892344e2fecbfa963 /src/set/mq.c | |
parent | 6625c27a83831b61a80683f4385b6a90b9a45b31 (diff) | |
download | gnunet-6f54b50858457dfa2b5f0b519fbf230e1119c6b2.tar.gz gnunet-6f54b50858457dfa2b5f0b519fbf230e1119c6b2.zip |
test cases for mq, set works
Diffstat (limited to 'src/set/mq.c')
-rw-r--r-- | src/set/mq.c | 130 |
1 files changed, 115 insertions, 15 deletions
diff --git a/src/set/mq.c b/src/set/mq.c index 3a9e614e9..0ced014dd 100644 --- a/src/set/mq.c +++ b/src/set/mq.c | |||
@@ -192,13 +192,22 @@ static void | |||
192 | dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) | 192 | dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) |
193 | { | 193 | { |
194 | const struct GNUNET_MQ_Handler *handler; | 194 | const struct GNUNET_MQ_Handler *handler; |
195 | int handled = GNUNET_NO; | ||
195 | 196 | ||
196 | handler = mq->handlers; | 197 | handler = mq->handlers; |
197 | if (NULL == handler) | 198 | if (NULL == handler) |
198 | return; | 199 | return; |
199 | for (; NULL != handler->cb; handler++) | 200 | for (; NULL != handler->cb; handler++) |
201 | { | ||
200 | if (handler->type == ntohs (mh->type)) | 202 | if (handler->type == ntohs (mh->type)) |
203 | { | ||
201 | handler->cb (mq->handlers_cls, mh); | 204 | handler->cb (mq->handlers_cls, mh); |
205 | handled = GNUNET_YES; | ||
206 | } | ||
207 | } | ||
208 | |||
209 | if (GNUNET_NO == handled) | ||
210 | LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type)); | ||
202 | } | 211 | } |
203 | 212 | ||
204 | 213 | ||
@@ -220,6 +229,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) | |||
220 | void | 229 | void |
221 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 230 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) |
222 | { | 231 | { |
232 | GNUNET_assert (NULL != mq); | ||
223 | mq->send_impl (mq, mqm); | 233 | mq->send_impl (mq, mqm); |
224 | } | 234 | } |
225 | 235 | ||
@@ -228,6 +238,7 @@ struct GNUNET_MQ_Message * | |||
228 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | 238 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) |
229 | { | 239 | { |
230 | struct GNUNET_MQ_Message *mqm; | 240 | struct GNUNET_MQ_Message *mqm; |
241 | |||
231 | mqm = GNUNET_malloc (sizeof *mqm + size); | 242 | mqm = GNUNET_malloc (sizeof *mqm + size); |
232 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; | 243 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; |
233 | mqm->mh->size = htons (size); | 244 | mqm->mh->size = htons (size); |
@@ -245,16 +256,18 @@ GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, | |||
245 | size_t new_size; | 256 | size_t new_size; |
246 | size_t old_size; | 257 | size_t old_size; |
247 | 258 | ||
259 | GNUNET_assert (NULL != mqmp); | ||
260 | /* there's no data to append => do nothing */ | ||
248 | if (NULL == data) | 261 | if (NULL == data) |
249 | return GNUNET_OK; | 262 | return GNUNET_OK; |
250 | GNUNET_assert (NULL != mqmp); | ||
251 | old_size = ntohs ((*mqmp)->mh->size); | 263 | old_size = ntohs ((*mqmp)->mh->size); |
252 | /* message too large to concatenate? */ | 264 | /* message too large to concatenate? */ |
253 | if (ntohs ((*mqmp)->mh->size) + len < len) | 265 | if (((uint16_t) (old_size + len)) < len) |
254 | return GNUNET_SYSERR; | 266 | return GNUNET_SYSERR; |
255 | new_size = old_size + len; | 267 | new_size = old_size + len; |
256 | *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); | 268 | *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); |
257 | memcpy ((*mqmp)->mh + old_size, data, new_size - old_size); | 269 | (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1]; |
270 | memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size); | ||
258 | (*mqmp)->mh->size = htons (new_size); | 271 | (*mqmp)->mh->size = htons (new_size); |
259 | return GNUNET_OK; | 272 | return GNUNET_OK; |
260 | } | 273 | } |
@@ -286,12 +299,10 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
286 | 299 | ||
287 | /* call cb for message we finished sending */ | 300 | /* call cb for message we finished sending */ |
288 | mqm = mq->current_msg; | 301 | mqm = mq->current_msg; |
289 | if (NULL != mqm) | 302 | GNUNET_assert (NULL != mq->current_msg); |
290 | { | 303 | if (NULL != mqm->sent_cb) |
291 | if (NULL != mqm->sent_cb) | 304 | mqm->sent_cb (mqm->sent_cls); |
292 | mqm->sent_cb (mqm->sent_cls); | 305 | GNUNET_free (mqm); |
293 | GNUNET_free (mqm); | ||
294 | } | ||
295 | 306 | ||
296 | mss->wh = NULL; | 307 | mss->wh = NULL; |
297 | 308 | ||
@@ -384,6 +395,35 @@ stream_data_processor (void *cls, | |||
384 | } | 395 | } |
385 | 396 | ||
386 | 397 | ||
398 | static void | ||
399 | stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | ||
400 | { | ||
401 | struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state; | ||
402 | |||
403 | if (NULL != mss->rh) | ||
404 | { | ||
405 | GNUNET_STREAM_read_cancel (mss->rh); | ||
406 | mss->rh = NULL; | ||
407 | } | ||
408 | |||
409 | if (NULL != mss->wh) | ||
410 | { | ||
411 | GNUNET_STREAM_write_cancel (mss->wh); | ||
412 | mss->wh = NULL; | ||
413 | } | ||
414 | |||
415 | if (NULL != mss->mst) | ||
416 | { | ||
417 | GNUNET_SERVER_mst_destroy (mss->mst); | ||
418 | mss->mst = NULL; | ||
419 | } | ||
420 | |||
421 | GNUNET_free (mss); | ||
422 | } | ||
423 | |||
424 | |||
425 | |||
426 | |||
387 | struct GNUNET_MQ_MessageQueue * | 427 | struct GNUNET_MQ_MessageQueue * |
388 | GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, | 428 | GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, |
389 | const struct GNUNET_MQ_Handler *handlers, | 429 | const struct GNUNET_MQ_Handler *handlers, |
@@ -397,6 +437,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, | |||
397 | mss->socket = socket; | 437 | mss->socket = socket; |
398 | mq->impl_state = mss; | 438 | mq->impl_state = mss; |
399 | mq->send_impl = stream_socket_send_impl; | 439 | mq->send_impl = stream_socket_send_impl; |
440 | mq->destroy_impl = &stream_socket_destroy_impl; | ||
400 | mq->handlers = handlers; | 441 | mq->handlers = handlers; |
401 | mq->handlers_cls = cls; | 442 | mq->handlers_cls = cls; |
402 | if (NULL != handlers) | 443 | if (NULL != handlers) |
@@ -425,14 +466,21 @@ transmit_queued (void *cls, size_t size, | |||
425 | struct ServerClientSocketState *state = mq->impl_state; | 466 | struct ServerClientSocketState *state = mq->impl_state; |
426 | size_t msg_size; | 467 | size_t msg_size; |
427 | 468 | ||
469 | GNUNET_assert (NULL != buf); | ||
470 | |||
471 | if (NULL != mqm->sent_cb) | ||
472 | { | ||
473 | mqm->sent_cb (mqm->sent_cls); | ||
474 | } | ||
475 | |||
428 | mq->current_msg = NULL; | 476 | mq->current_msg = NULL; |
429 | GNUNET_assert (NULL != mqm); | 477 | GNUNET_assert (NULL != mqm); |
430 | GNUNET_assert (NULL != buf); | ||
431 | msg_size = ntohs (mqm->mh->size); | 478 | msg_size = ntohs (mqm->mh->size); |
432 | GNUNET_assert (size >= msg_size); | 479 | GNUNET_assert (size >= msg_size); |
433 | memcpy (buf, mqm->mh, msg_size); | 480 | memcpy (buf, mqm->mh, msg_size); |
434 | GNUNET_free (mqm); | 481 | GNUNET_free (mqm); |
435 | state->th = NULL; | 482 | state->th = NULL; |
483 | |||
436 | if (NULL != mq->msg_head) | 484 | if (NULL != mq->msg_head) |
437 | { | 485 | { |
438 | mq->current_msg = mq->msg_head; | 486 | mq->current_msg = mq->msg_head; |
@@ -448,12 +496,27 @@ transmit_queued (void *cls, size_t size, | |||
448 | } | 496 | } |
449 | 497 | ||
450 | 498 | ||
499 | |||
500 | static void | ||
501 | server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | ||
502 | { | ||
503 | struct ServerClientSocketState *state; | ||
504 | |||
505 | GNUNET_assert (NULL != mq); | ||
506 | state = mq->impl_state; | ||
507 | GNUNET_assert (NULL != state); | ||
508 | GNUNET_SERVER_client_drop (state->client); | ||
509 | GNUNET_free (state); | ||
510 | } | ||
511 | |||
451 | static void | 512 | static void |
452 | server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 513 | server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) |
453 | { | 514 | { |
454 | struct ServerClientSocketState *state = mq->impl_state; | 515 | struct ServerClientSocketState *state; |
455 | int msize; | 516 | int msize; |
456 | 517 | ||
518 | GNUNET_assert (NULL != mq); | ||
519 | state = mq->impl_state; | ||
457 | GNUNET_assert (NULL != state); | 520 | GNUNET_assert (NULL != state); |
458 | 521 | ||
459 | if (NULL != state->th) | 522 | if (NULL != state->th) |
@@ -461,8 +524,9 @@ server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes | |||
461 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | 524 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); |
462 | return; | 525 | return; |
463 | } | 526 | } |
527 | GNUNET_assert (NULL == mq->msg_head); | ||
464 | GNUNET_assert (NULL == mq->current_msg); | 528 | GNUNET_assert (NULL == mq->current_msg); |
465 | msize = ntohs (mq->msg_head->mh->size); | 529 | msize = ntohs (mqm->mh->size); |
466 | mq->current_msg = mqm; | 530 | mq->current_msg = mqm; |
467 | state->th = | 531 | state->th = |
468 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, | 532 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, |
@@ -480,7 +544,10 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) | |||
480 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | 544 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); |
481 | scss = GNUNET_new (struct ServerClientSocketState); | 545 | scss = GNUNET_new (struct ServerClientSocketState); |
482 | mq->impl_state = scss; | 546 | mq->impl_state = scss; |
547 | scss->client = client; | ||
548 | GNUNET_SERVER_client_keep (client); | ||
483 | mq->send_impl = server_client_send_impl; | 549 | mq->send_impl = server_client_send_impl; |
550 | mq->destroy_impl = server_client_destroy_impl; | ||
484 | return mq; | 551 | return mq; |
485 | } | 552 | } |
486 | 553 | ||
@@ -502,8 +569,15 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
502 | struct ClientConnectionState *state = mq->impl_state; | 569 | struct ClientConnectionState *state = mq->impl_state; |
503 | size_t msg_size; | 570 | size_t msg_size; |
504 | 571 | ||
505 | mq->current_msg = NULL; | 572 | |
506 | GNUNET_assert (NULL != mqm); | 573 | GNUNET_assert (NULL != mqm); |
574 | |||
575 | if (NULL != mqm->sent_cb) | ||
576 | { | ||
577 | mqm->sent_cb (mqm->sent_cls); | ||
578 | } | ||
579 | |||
580 | mq->current_msg = NULL; | ||
507 | GNUNET_assert (NULL != buf); | 581 | GNUNET_assert (NULL != buf); |
508 | msg_size = ntohs (mqm->mh->size); | 582 | msg_size = ntohs (mqm->mh->size); |
509 | GNUNET_assert (size >= msg_size); | 583 | GNUNET_assert (size >= msg_size); |
@@ -515,7 +589,7 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
515 | mq->current_msg = mq->msg_head; | 589 | mq->current_msg = mq->msg_head; |
516 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | 590 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); |
517 | state->th = | 591 | state->th = |
518 | GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size), | 592 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), |
519 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | 593 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, |
520 | &connection_client_transmit_queued, mq); | 594 | &connection_client_transmit_queued, mq); |
521 | } | 595 | } |
@@ -525,6 +599,13 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
525 | } | 599 | } |
526 | 600 | ||
527 | 601 | ||
602 | |||
603 | static void | ||
604 | connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | ||
605 | { | ||
606 | GNUNET_free (mq->impl_state); | ||
607 | } | ||
608 | |||
528 | static void | 609 | static void |
529 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, | 610 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, |
530 | struct GNUNET_MQ_Message *mqm) | 611 | struct GNUNET_MQ_Message *mqm) |
@@ -549,6 +630,7 @@ connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, | |||
549 | } | 630 | } |
550 | 631 | ||
551 | 632 | ||
633 | |||
552 | /** | 634 | /** |
553 | * Type of a function to call when we receive a message | 635 | * Type of a function to call when we receive a message |
554 | * from the service. | 636 | * from the service. |
@@ -561,6 +643,9 @@ handle_client_message (void *cls, | |||
561 | const struct GNUNET_MessageHeader *msg) | 643 | const struct GNUNET_MessageHeader *msg) |
562 | { | 644 | { |
563 | struct GNUNET_MQ_MessageQueue *mq = cls; | 645 | struct GNUNET_MQ_MessageQueue *mq = cls; |
646 | struct ClientConnectionState *state; | ||
647 | |||
648 | state = mq->impl_state; | ||
564 | 649 | ||
565 | if (NULL == msg) | 650 | if (NULL == msg) |
566 | { | 651 | { |
@@ -569,6 +654,10 @@ handle_client_message (void *cls, | |||
569 | mq->read_error_cb (mq->read_error_cls); | 654 | mq->read_error_cb (mq->read_error_cls); |
570 | return; | 655 | return; |
571 | } | 656 | } |
657 | |||
658 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | ||
659 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
660 | |||
572 | dispatch_message (mq, msg); | 661 | dispatch_message (mq, msg); |
573 | } | 662 | } |
574 | 663 | ||
@@ -590,6 +679,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti | |||
590 | state->connection = connection; | 679 | state->connection = connection; |
591 | mq->impl_state = state; | 680 | mq->impl_state = state; |
592 | mq->send_impl = connection_client_send_impl; | 681 | mq->send_impl = connection_client_send_impl; |
682 | mq->destroy_impl = connection_client_destroy_impl; | ||
593 | 683 | ||
594 | if (NULL != handlers) | 684 | if (NULL != handlers) |
595 | { | 685 | { |
@@ -626,7 +716,10 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | |||
626 | uint32_t id; | 716 | uint32_t id; |
627 | 717 | ||
628 | if (NULL == mq->assoc_map) | 718 | if (NULL == mq->assoc_map) |
719 | { | ||
629 | mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); | 720 | mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); |
721 | mq->assoc_id = 1; | ||
722 | } | ||
630 | id = mq->assoc_id++; | 723 | id = mq->assoc_id++; |
631 | GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, | 724 | GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, |
632 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 725 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); |
@@ -652,6 +745,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | |||
652 | if (NULL == mq->assoc_map) | 745 | if (NULL == mq->assoc_map) |
653 | return NULL; | 746 | return NULL; |
654 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); | 747 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); |
748 | GNUNET_assert (NULL != val); | ||
655 | GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val); | 749 | GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val); |
656 | return val; | 750 | return val; |
657 | } | 751 | } |
@@ -671,6 +765,12 @@ void | |||
671 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | 765 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) |
672 | { | 766 | { |
673 | /* FIXME: destroy all pending messages in the queue */ | 767 | /* FIXME: destroy all pending messages in the queue */ |
768 | |||
769 | if (NULL != mq->destroy_impl) | ||
770 | { | ||
771 | mq->destroy_impl (mq); | ||
772 | } | ||
773 | |||
674 | GNUNET_free (mq); | 774 | GNUNET_free (mq); |
675 | } | 775 | } |
676 | 776 | ||