aboutsummaryrefslogtreecommitdiff
path: root/src/set/mq.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-05-15 10:48:55 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-05-15 10:48:55 +0000
commit6f54b50858457dfa2b5f0b519fbf230e1119c6b2 (patch)
tree5f84bfa599cb50522999cad892344e2fecbfa963 /src/set/mq.c
parent6625c27a83831b61a80683f4385b6a90b9a45b31 (diff)
downloadgnunet-6f54b50858457dfa2b5f0b519fbf230e1119c6b2.tar.gz
gnunet-6f54b50858457dfa2b5f0b519fbf230e1119c6b2.zip
test cases for mq, set works
Diffstat (limited to 'src/set/mq.c')
-rw-r--r--src/set/mq.c130
1 files changed, 115 insertions, 15 deletions
diff --git a/src/set/mq.c b/src/set/mq.c
index 3a9e614e9..0ced014dd 100644
--- a/src/set/mq.c
+++ b/src/set/mq.c
@@ -192,13 +192,22 @@ static void
192dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) 192dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
193{ 193{
194 const struct GNUNET_MQ_Handler *handler; 194 const struct GNUNET_MQ_Handler *handler;
195 int handled = GNUNET_NO;
195 196
196 handler = mq->handlers; 197 handler = mq->handlers;
197 if (NULL == handler) 198 if (NULL == handler)
198 return; 199 return;
199 for (; NULL != handler->cb; handler++) 200 for (; NULL != handler->cb; handler++)
201 {
200 if (handler->type == ntohs (mh->type)) 202 if (handler->type == ntohs (mh->type))
203 {
201 handler->cb (mq->handlers_cls, mh); 204 handler->cb (mq->handlers_cls, mh);
205 handled = GNUNET_YES;
206 }
207 }
208
209 if (GNUNET_NO == handled)
210 LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
202} 211}
203 212
204 213
@@ -220,6 +229,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
220void 229void
221GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 230GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
222{ 231{
232 GNUNET_assert (NULL != mq);
223 mq->send_impl (mq, mqm); 233 mq->send_impl (mq, mqm);
224} 234}
225 235
@@ -228,6 +238,7 @@ struct GNUNET_MQ_Message *
228GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) 238GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
229{ 239{
230 struct GNUNET_MQ_Message *mqm; 240 struct GNUNET_MQ_Message *mqm;
241
231 mqm = GNUNET_malloc (sizeof *mqm + size); 242 mqm = GNUNET_malloc (sizeof *mqm + size);
232 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; 243 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
233 mqm->mh->size = htons (size); 244 mqm->mh->size = htons (size);
@@ -245,16 +256,18 @@ GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
245 size_t new_size; 256 size_t new_size;
246 size_t old_size; 257 size_t old_size;
247 258
259 GNUNET_assert (NULL != mqmp);
260 /* there's no data to append => do nothing */
248 if (NULL == data) 261 if (NULL == data)
249 return GNUNET_OK; 262 return GNUNET_OK;
250 GNUNET_assert (NULL != mqmp);
251 old_size = ntohs ((*mqmp)->mh->size); 263 old_size = ntohs ((*mqmp)->mh->size);
252 /* message too large to concatenate? */ 264 /* message too large to concatenate? */
253 if (ntohs ((*mqmp)->mh->size) + len < len) 265 if (((uint16_t) (old_size + len)) < len)
254 return GNUNET_SYSERR; 266 return GNUNET_SYSERR;
255 new_size = old_size + len; 267 new_size = old_size + len;
256 *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); 268 *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
257 memcpy ((*mqmp)->mh + old_size, data, new_size - old_size); 269 (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
270 memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
258 (*mqmp)->mh->size = htons (new_size); 271 (*mqmp)->mh->size = htons (new_size);
259 return GNUNET_OK; 272 return GNUNET_OK;
260} 273}
@@ -286,12 +299,10 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
286 299
287 /* call cb for message we finished sending */ 300 /* call cb for message we finished sending */
288 mqm = mq->current_msg; 301 mqm = mq->current_msg;
289 if (NULL != mqm) 302 GNUNET_assert (NULL != mq->current_msg);
290 { 303 if (NULL != mqm->sent_cb)
291 if (NULL != mqm->sent_cb) 304 mqm->sent_cb (mqm->sent_cls);
292 mqm->sent_cb (mqm->sent_cls); 305 GNUNET_free (mqm);
293 GNUNET_free (mqm);
294 }
295 306
296 mss->wh = NULL; 307 mss->wh = NULL;
297 308
@@ -384,6 +395,35 @@ stream_data_processor (void *cls,
384} 395}
385 396
386 397
398static void
399stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
400{
401 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
402
403 if (NULL != mss->rh)
404 {
405 GNUNET_STREAM_read_cancel (mss->rh);
406 mss->rh = NULL;
407 }
408
409 if (NULL != mss->wh)
410 {
411 GNUNET_STREAM_write_cancel (mss->wh);
412 mss->wh = NULL;
413 }
414
415 if (NULL != mss->mst)
416 {
417 GNUNET_SERVER_mst_destroy (mss->mst);
418 mss->mst = NULL;
419 }
420
421 GNUNET_free (mss);
422}
423
424
425
426
387struct GNUNET_MQ_MessageQueue * 427struct GNUNET_MQ_MessageQueue *
388GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, 428GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
389 const struct GNUNET_MQ_Handler *handlers, 429 const struct GNUNET_MQ_Handler *handlers,
@@ -397,6 +437,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
397 mss->socket = socket; 437 mss->socket = socket;
398 mq->impl_state = mss; 438 mq->impl_state = mss;
399 mq->send_impl = stream_socket_send_impl; 439 mq->send_impl = stream_socket_send_impl;
440 mq->destroy_impl = &stream_socket_destroy_impl;
400 mq->handlers = handlers; 441 mq->handlers = handlers;
401 mq->handlers_cls = cls; 442 mq->handlers_cls = cls;
402 if (NULL != handlers) 443 if (NULL != handlers)
@@ -425,14 +466,21 @@ transmit_queued (void *cls, size_t size,
425 struct ServerClientSocketState *state = mq->impl_state; 466 struct ServerClientSocketState *state = mq->impl_state;
426 size_t msg_size; 467 size_t msg_size;
427 468
469 GNUNET_assert (NULL != buf);
470
471 if (NULL != mqm->sent_cb)
472 {
473 mqm->sent_cb (mqm->sent_cls);
474 }
475
428 mq->current_msg = NULL; 476 mq->current_msg = NULL;
429 GNUNET_assert (NULL != mqm); 477 GNUNET_assert (NULL != mqm);
430 GNUNET_assert (NULL != buf);
431 msg_size = ntohs (mqm->mh->size); 478 msg_size = ntohs (mqm->mh->size);
432 GNUNET_assert (size >= msg_size); 479 GNUNET_assert (size >= msg_size);
433 memcpy (buf, mqm->mh, msg_size); 480 memcpy (buf, mqm->mh, msg_size);
434 GNUNET_free (mqm); 481 GNUNET_free (mqm);
435 state->th = NULL; 482 state->th = NULL;
483
436 if (NULL != mq->msg_head) 484 if (NULL != mq->msg_head)
437 { 485 {
438 mq->current_msg = mq->msg_head; 486 mq->current_msg = mq->msg_head;
@@ -448,12 +496,27 @@ transmit_queued (void *cls, size_t size,
448} 496}
449 497
450 498
499
500static void
501server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
502{
503 struct ServerClientSocketState *state;
504
505 GNUNET_assert (NULL != mq);
506 state = mq->impl_state;
507 GNUNET_assert (NULL != state);
508 GNUNET_SERVER_client_drop (state->client);
509 GNUNET_free (state);
510}
511
451static void 512static void
452server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 513server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
453{ 514{
454 struct ServerClientSocketState *state = mq->impl_state; 515 struct ServerClientSocketState *state;
455 int msize; 516 int msize;
456 517
518 GNUNET_assert (NULL != mq);
519 state = mq->impl_state;
457 GNUNET_assert (NULL != state); 520 GNUNET_assert (NULL != state);
458 521
459 if (NULL != state->th) 522 if (NULL != state->th)
@@ -461,8 +524,9 @@ server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes
461 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); 524 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
462 return; 525 return;
463 } 526 }
527 GNUNET_assert (NULL == mq->msg_head);
464 GNUNET_assert (NULL == mq->current_msg); 528 GNUNET_assert (NULL == mq->current_msg);
465 msize = ntohs (mq->msg_head->mh->size); 529 msize = ntohs (mqm->mh->size);
466 mq->current_msg = mqm; 530 mq->current_msg = mqm;
467 state->th = 531 state->th =
468 GNUNET_SERVER_notify_transmit_ready (state->client, msize, 532 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
@@ -480,7 +544,10 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
480 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); 544 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
481 scss = GNUNET_new (struct ServerClientSocketState); 545 scss = GNUNET_new (struct ServerClientSocketState);
482 mq->impl_state = scss; 546 mq->impl_state = scss;
547 scss->client = client;
548 GNUNET_SERVER_client_keep (client);
483 mq->send_impl = server_client_send_impl; 549 mq->send_impl = server_client_send_impl;
550 mq->destroy_impl = server_client_destroy_impl;
484 return mq; 551 return mq;
485} 552}
486 553
@@ -502,8 +569,15 @@ connection_client_transmit_queued (void *cls, size_t size,
502 struct ClientConnectionState *state = mq->impl_state; 569 struct ClientConnectionState *state = mq->impl_state;
503 size_t msg_size; 570 size_t msg_size;
504 571
505 mq->current_msg = NULL; 572
506 GNUNET_assert (NULL != mqm); 573 GNUNET_assert (NULL != mqm);
574
575 if (NULL != mqm->sent_cb)
576 {
577 mqm->sent_cb (mqm->sent_cls);
578 }
579
580 mq->current_msg = NULL;
507 GNUNET_assert (NULL != buf); 581 GNUNET_assert (NULL != buf);
508 msg_size = ntohs (mqm->mh->size); 582 msg_size = ntohs (mqm->mh->size);
509 GNUNET_assert (size >= msg_size); 583 GNUNET_assert (size >= msg_size);
@@ -515,7 +589,7 @@ connection_client_transmit_queued (void *cls, size_t size,
515 mq->current_msg = mq->msg_head; 589 mq->current_msg = mq->msg_head;
516 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); 590 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
517 state->th = 591 state->th =
518 GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size), 592 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
519 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, 593 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
520 &connection_client_transmit_queued, mq); 594 &connection_client_transmit_queued, mq);
521 } 595 }
@@ -525,6 +599,13 @@ connection_client_transmit_queued (void *cls, size_t size,
525} 599}
526 600
527 601
602
603static void
604connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
605{
606 GNUNET_free (mq->impl_state);
607}
608
528static void 609static void
529connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, 610connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
530 struct GNUNET_MQ_Message *mqm) 611 struct GNUNET_MQ_Message *mqm)
@@ -549,6 +630,7 @@ connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
549} 630}
550 631
551 632
633
552/** 634/**
553 * Type of a function to call when we receive a message 635 * Type of a function to call when we receive a message
554 * from the service. 636 * from the service.
@@ -561,6 +643,9 @@ handle_client_message (void *cls,
561 const struct GNUNET_MessageHeader *msg) 643 const struct GNUNET_MessageHeader *msg)
562{ 644{
563 struct GNUNET_MQ_MessageQueue *mq = cls; 645 struct GNUNET_MQ_MessageQueue *mq = cls;
646 struct ClientConnectionState *state;
647
648 state = mq->impl_state;
564 649
565 if (NULL == msg) 650 if (NULL == msg)
566 { 651 {
@@ -569,6 +654,10 @@ handle_client_message (void *cls,
569 mq->read_error_cb (mq->read_error_cls); 654 mq->read_error_cb (mq->read_error_cls);
570 return; 655 return;
571 } 656 }
657
658 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
659 GNUNET_TIME_UNIT_FOREVER_REL);
660
572 dispatch_message (mq, msg); 661 dispatch_message (mq, msg);
573} 662}
574 663
@@ -590,6 +679,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
590 state->connection = connection; 679 state->connection = connection;
591 mq->impl_state = state; 680 mq->impl_state = state;
592 mq->send_impl = connection_client_send_impl; 681 mq->send_impl = connection_client_send_impl;
682 mq->destroy_impl = connection_client_destroy_impl;
593 683
594 if (NULL != handlers) 684 if (NULL != handlers)
595 { 685 {
@@ -626,7 +716,10 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
626 uint32_t id; 716 uint32_t id;
627 717
628 if (NULL == mq->assoc_map) 718 if (NULL == mq->assoc_map)
719 {
629 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); 720 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
721 mq->assoc_id = 1;
722 }
630 id = mq->assoc_id++; 723 id = mq->assoc_id++;
631 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, 724 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
632 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 725 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
@@ -652,6 +745,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
652 if (NULL == mq->assoc_map) 745 if (NULL == mq->assoc_map)
653 return NULL; 746 return NULL;
654 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); 747 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
748 GNUNET_assert (NULL != val);
655 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val); 749 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
656 return val; 750 return val;
657} 751}
@@ -671,6 +765,12 @@ void
671GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) 765GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
672{ 766{
673 /* FIXME: destroy all pending messages in the queue */ 767 /* FIXME: destroy all pending messages in the queue */
768
769 if (NULL != mq->destroy_impl)
770 {
771 mq->destroy_impl (mq);
772 }
773
674 GNUNET_free (mq); 774 GNUNET_free (mq);
675} 775}
676 776