From 5149ebc1f85749f8c5bdbef7dc3044da97ef50c1 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Tue, 15 May 2012 15:50:29 +0000 Subject: -added message queue --- src/lockmanager/lockmanager_api.c | 192 ++++++++++++++++++++++++++++---------- 1 file changed, 144 insertions(+), 48 deletions(-) (limited to 'src/lockmanager') diff --git a/src/lockmanager/lockmanager_api.c b/src/lockmanager/lockmanager_api.c index 8e482a730..99f5ab503 100644 --- a/src/lockmanager/lockmanager_api.c +++ b/src/lockmanager/lockmanager_api.c @@ -24,6 +24,12 @@ * @author Sree Harsha Totakura */ +/** + * To be fixed: + * Should the handle be freed when the connection to service is lost? + * Should cancel_request have a call back (else simultaneous calls break) + */ + #include "platform.h" #include "gnunet_common.h" #include "gnunet_container_lib.h" @@ -42,6 +48,29 @@ #define TIMEOUT TIME_REL_MINS(3) + +/** + * The message queue + */ +struct MessageQueue +{ + /** + * The next pointer for doubly linked list + */ + struct MessageQueue *next; + + /** + * The prev pointer for doubly linked list + */ + struct MessageQueue *prev; + + /** + * The LOCKMANAGER Message + */ + struct GNUNET_LOCKMANAGER_Message *msg; +}; + + /** * Handler for the lockmanager service */ @@ -52,10 +81,25 @@ struct GNUNET_LOCKMANAGER_Handle */ struct GNUNET_CLIENT_Connection *conn; + /** + * The transmit handle for transmissions using conn + */ + struct GNUNET_CLIENT_TransmitHandle *transmit_handle; + /** * Hashmap handle */ struct GNUNET_CONTAINER_MultiHashMap *hashmap; + + /** + * Double linked list head for message queue + */ + struct MessageQueue *mq_head; + + /** + * Double linked list tail for message queue + */ + struct MessageQueue *mq_tail; }; @@ -79,11 +123,6 @@ struct GNUNET_LOCKMANAGER_LockingRequest */ void *status_cb_cls; - /** - * The pending transmit handle for the ACQUIRE message - */ - struct GNUNET_CLIENT_TransmitHandle *transmit_handle; - /** * The locking domain of this request */ @@ -123,6 +162,86 @@ struct LockingRequestMatch }; +/** + * Transmit notify for sending message to server + * + * @param cls the lockmanager handle + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_notify (void *cls, size_t size, void *buf) +{ + struct GNUNET_LOCKMANAGER_Handle *handle = cls; + struct MessageQueue *queue_entity; + uint16_t msg_size; + + handle->transmit_handle = NULL; + if ((0 == size) || (NULL == buf)) + { + /* FIXME: Timed out -- requeue? */ + return 0; + } + queue_entity = handle->mq_head; + GNUNET_assert (NULL != queue_entity); + msg_size = ntohs (queue_entity->msg->header.size); + GNUNET_assert (size >= msg_size); + memcpy (buf, queue_entity->msg, msg_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message of size %u sent\n", msg_size); + GNUNET_free (queue_entity->msg); + GNUNET_CONTAINER_DLL_remove (handle->mq_head, + handle->mq_tail, + queue_entity); + GNUNET_free (queue_entity); + queue_entity = handle->mq_head; + if (NULL != queue_entity) + { + handle->transmit_handle = + GNUNET_CLIENT_notify_transmit_ready (handle->conn, + ntohs + (queue_entity->msg->header.size), + TIMEOUT, + GNUNET_YES, + &transmit_notify, + handle); + } + return msg_size; +} + + +/** + * Queues a message into handle's send message queue + * + * @param handle the lockmanager handle whose queue will be used + * @param msg the message to be queued + */ +static void +queue_message (struct GNUNET_LOCKMANAGER_Handle *handle, + struct GNUNET_LOCKMANAGER_Message *msg) +{ + struct MessageQueue *queue_entity; + + GNUNET_assert (NULL != msg); + queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); + queue_entity->msg = msg; + GNUNET_CONTAINER_DLL_insert_tail (handle->mq_head, + handle->mq_tail, + queue_entity); + if (NULL == handle->transmit_handle) + { + handle->transmit_handle = + GNUNET_CLIENT_notify_transmit_ready (handle->conn, + ntohs (msg->header.size), + TIMEOUT, + GNUNET_YES, + &transmit_notify, + handle); + } +} + + /** * Get the key for the given lock in the 'lock_map'. * @@ -341,35 +460,6 @@ handle_replies (void *cls, } -/** - * Transmit notify for sending message to server - * - * @param cls the message to send - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_notify (void *cls, size_t size, void *buf) -{ - struct GNUNET_LOCKMANAGER_Message *msg = cls; - uint16_t msg_size; - - if ((0 == size) || (NULL == buf)) - { - /* FIXME: Timed out -- requeue? */ - return 0; - } - msg_size = ntohs (msg->header.size); - GNUNET_assert (size >= msg_size); - memcpy (buf, msg, msg_size); - GNUNET_free (msg); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Message of size %u sent\n", msg_size); - return msg_size; -} - - /** * Iterator to free hash map entries. * @@ -446,8 +536,9 @@ GNUNET_LOCKMANAGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) void GNUNET_LOCKMANAGER_disconnect (struct GNUNET_LOCKMANAGER_Handle *handle) { + struct MessageQueue *head; + LOG (GNUNET_ERROR_TYPE_DEBUG, "%s()\n", __func__); - GNUNET_CLIENT_disconnect (handle->conn); if (0 != GNUNET_CONTAINER_multihashmap_size (handle->hashmap)) { LOG (GNUNET_ERROR_TYPE_WARNING, @@ -458,6 +549,22 @@ GNUNET_LOCKMANAGER_disconnect (struct GNUNET_LOCKMANAGER_Handle *handle) handle); } GNUNET_CONTAINER_multihashmap_destroy (handle->hashmap); + /* Clear the message queue */ + if (NULL != handle->transmit_handle) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (handle->transmit_handle); + } + head = handle->mq_head; + while (NULL != head) + { + GNUNET_CONTAINER_DLL_remove (handle->mq_head, + handle->mq_tail, + head); + GNUNET_free (head->msg); + GNUNET_free (head); + head = handle->mq_head; + } + GNUNET_CLIENT_disconnect (handle->conn); GNUNET_free (handle); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__); } @@ -517,13 +624,7 @@ GNUNET_LOCKMANAGER_acquire_lock (struct GNUNET_LOCKMANAGER_Handle *handle, msg->lock = htonl (lock); memcpy (&msg[1], r->domain, domain_name_length); LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n"); - r->transmit_handle = - GNUNET_CLIENT_notify_transmit_ready (r->handle->conn, - msg_size, - TIMEOUT, - GNUNET_YES, - &transmit_notify, - msg); + queue_message (handle, msg); get_key (r->domain, r->lock, &hash); GNUNET_CONTAINER_multihashmap_put (r->handle->hashmap, &hash, @@ -564,12 +665,7 @@ GNUNET_LOCKMANAGER_cancel_request (struct GNUNET_LOCKMANAGER_LockingRequest msg->header.size = htons (msg_size); msg->lock = htonl (request->lock); memcpy (&msg[1], request->domain, domain_name_length); - GNUNET_CLIENT_notify_transmit_ready (request->handle->conn, - msg_size, - TIMEOUT, /* What if this fails */ - GNUNET_NO, - &transmit_notify, - msg); + queue_message (request->handle, msg); } get_key (request->domain, request->lock, &hash); GNUNET_assert (GNUNET_YES == -- cgit v1.2.3