aboutsummaryrefslogtreecommitdiff
path: root/src/lockmanager
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-05-15 15:50:29 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-05-15 15:50:29 +0000
commit5149ebc1f85749f8c5bdbef7dc3044da97ef50c1 (patch)
tree057f13a4782aabb4949271220271dc66888c9576 /src/lockmanager
parent06f3d1aed324bd6919c55921fda63ce66a86a8d4 (diff)
downloadgnunet-5149ebc1f85749f8c5bdbef7dc3044da97ef50c1.tar.gz
gnunet-5149ebc1f85749f8c5bdbef7dc3044da97ef50c1.zip
-added message queue
Diffstat (limited to 'src/lockmanager')
-rw-r--r--src/lockmanager/lockmanager_api.c192
1 files changed, 144 insertions, 48 deletions
diff --git a/src/lockmanager/lockmanager_api.c b/src/lockmanager/lockmanager_api.c
index 8e482a730..99f5ab503 100644
--- a/src/lockmanager/lockmanager_api.c
+++ b/src/lockmanager/lockmanager_api.c
@@ -24,6 +24,12 @@
24 * @author Sree Harsha Totakura 24 * @author Sree Harsha Totakura
25 */ 25 */
26 26
27/**
28 * To be fixed:
29 * Should the handle be freed when the connection to service is lost?
30 * Should cancel_request have a call back (else simultaneous calls break)
31 */
32
27#include "platform.h" 33#include "platform.h"
28#include "gnunet_common.h" 34#include "gnunet_common.h"
29#include "gnunet_container_lib.h" 35#include "gnunet_container_lib.h"
@@ -42,6 +48,29 @@
42 48
43#define TIMEOUT TIME_REL_MINS(3) 49#define TIMEOUT TIME_REL_MINS(3)
44 50
51
52/**
53 * The message queue
54 */
55struct MessageQueue
56{
57 /**
58 * The next pointer for doubly linked list
59 */
60 struct MessageQueue *next;
61
62 /**
63 * The prev pointer for doubly linked list
64 */
65 struct MessageQueue *prev;
66
67 /**
68 * The LOCKMANAGER Message
69 */
70 struct GNUNET_LOCKMANAGER_Message *msg;
71};
72
73
45/** 74/**
46 * Handler for the lockmanager service 75 * Handler for the lockmanager service
47 */ 76 */
@@ -53,9 +82,24 @@ struct GNUNET_LOCKMANAGER_Handle
53 struct GNUNET_CLIENT_Connection *conn; 82 struct GNUNET_CLIENT_Connection *conn;
54 83
55 /** 84 /**
85 * The transmit handle for transmissions using conn
86 */
87 struct GNUNET_CLIENT_TransmitHandle *transmit_handle;
88
89 /**
56 * Hashmap handle 90 * Hashmap handle
57 */ 91 */
58 struct GNUNET_CONTAINER_MultiHashMap *hashmap; 92 struct GNUNET_CONTAINER_MultiHashMap *hashmap;
93
94 /**
95 * Double linked list head for message queue
96 */
97 struct MessageQueue *mq_head;
98
99 /**
100 * Double linked list tail for message queue
101 */
102 struct MessageQueue *mq_tail;
59}; 103};
60 104
61 105
@@ -80,11 +124,6 @@ struct GNUNET_LOCKMANAGER_LockingRequest
80 void *status_cb_cls; 124 void *status_cb_cls;
81 125
82 /** 126 /**
83 * The pending transmit handle for the ACQUIRE message
84 */
85 struct GNUNET_CLIENT_TransmitHandle *transmit_handle;
86
87 /**
88 * The locking domain of this request 127 * The locking domain of this request
89 */ 128 */
90 char *domain; 129 char *domain;
@@ -124,6 +163,86 @@ struct LockingRequestMatch
124 163
125 164
126/** 165/**
166 * Transmit notify for sending message to server
167 *
168 * @param cls the lockmanager handle
169 * @param size number of bytes available in buf
170 * @param buf where the callee should write the message
171 * @return number of bytes written to buf
172 */
173static size_t
174transmit_notify (void *cls, size_t size, void *buf)
175{
176 struct GNUNET_LOCKMANAGER_Handle *handle = cls;
177 struct MessageQueue *queue_entity;
178 uint16_t msg_size;
179
180 handle->transmit_handle = NULL;
181 if ((0 == size) || (NULL == buf))
182 {
183 /* FIXME: Timed out -- requeue? */
184 return 0;
185 }
186 queue_entity = handle->mq_head;
187 GNUNET_assert (NULL != queue_entity);
188 msg_size = ntohs (queue_entity->msg->header.size);
189 GNUNET_assert (size >= msg_size);
190 memcpy (buf, queue_entity->msg, msg_size);
191 LOG (GNUNET_ERROR_TYPE_DEBUG,
192 "Message of size %u sent\n", msg_size);
193 GNUNET_free (queue_entity->msg);
194 GNUNET_CONTAINER_DLL_remove (handle->mq_head,
195 handle->mq_tail,
196 queue_entity);
197 GNUNET_free (queue_entity);
198 queue_entity = handle->mq_head;
199 if (NULL != queue_entity)
200 {
201 handle->transmit_handle =
202 GNUNET_CLIENT_notify_transmit_ready (handle->conn,
203 ntohs
204 (queue_entity->msg->header.size),
205 TIMEOUT,
206 GNUNET_YES,
207 &transmit_notify,
208 handle);
209 }
210 return msg_size;
211}
212
213
214/**
215 * Queues a message into handle's send message queue
216 *
217 * @param handle the lockmanager handle whose queue will be used
218 * @param msg the message to be queued
219 */
220static void
221queue_message (struct GNUNET_LOCKMANAGER_Handle *handle,
222 struct GNUNET_LOCKMANAGER_Message *msg)
223{
224 struct MessageQueue *queue_entity;
225
226 GNUNET_assert (NULL != msg);
227 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
228 queue_entity->msg = msg;
229 GNUNET_CONTAINER_DLL_insert_tail (handle->mq_head,
230 handle->mq_tail,
231 queue_entity);
232 if (NULL == handle->transmit_handle)
233 {
234 handle->transmit_handle =
235 GNUNET_CLIENT_notify_transmit_ready (handle->conn,
236 ntohs (msg->header.size),
237 TIMEOUT,
238 GNUNET_YES,
239 &transmit_notify,
240 handle);
241 }
242}
243
244
245/**
127 * Get the key for the given lock in the 'lock_map'. 246 * Get the key for the given lock in the 'lock_map'.
128 * 247 *
129 * @param domain_name 248 * @param domain_name
@@ -342,35 +461,6 @@ handle_replies (void *cls,
342 461
343 462
344/** 463/**
345 * Transmit notify for sending message to server
346 *
347 * @param cls the message to send
348 * @param size number of bytes available in buf
349 * @param buf where the callee should write the message
350 * @return number of bytes written to buf
351 */
352static size_t
353transmit_notify (void *cls, size_t size, void *buf)
354{
355 struct GNUNET_LOCKMANAGER_Message *msg = cls;
356 uint16_t msg_size;
357
358 if ((0 == size) || (NULL == buf))
359 {
360 /* FIXME: Timed out -- requeue? */
361 return 0;
362 }
363 msg_size = ntohs (msg->header.size);
364 GNUNET_assert (size >= msg_size);
365 memcpy (buf, msg, msg_size);
366 GNUNET_free (msg);
367 LOG (GNUNET_ERROR_TYPE_DEBUG,
368 "Message of size %u sent\n", msg_size);
369 return msg_size;
370}
371
372
373/**
374 * Iterator to free hash map entries. 464 * Iterator to free hash map entries.
375 * 465 *
376 * @param cls the lockmanger handle 466 * @param cls the lockmanger handle
@@ -446,8 +536,9 @@ GNUNET_LOCKMANAGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
446void 536void
447GNUNET_LOCKMANAGER_disconnect (struct GNUNET_LOCKMANAGER_Handle *handle) 537GNUNET_LOCKMANAGER_disconnect (struct GNUNET_LOCKMANAGER_Handle *handle)
448{ 538{
539 struct MessageQueue *head;
540
449 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s()\n", __func__); 541 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s()\n", __func__);
450 GNUNET_CLIENT_disconnect (handle->conn);
451 if (0 != GNUNET_CONTAINER_multihashmap_size (handle->hashmap)) 542 if (0 != GNUNET_CONTAINER_multihashmap_size (handle->hashmap))
452 { 543 {
453 LOG (GNUNET_ERROR_TYPE_WARNING, 544 LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -458,6 +549,22 @@ GNUNET_LOCKMANAGER_disconnect (struct GNUNET_LOCKMANAGER_Handle *handle)
458 handle); 549 handle);
459 } 550 }
460 GNUNET_CONTAINER_multihashmap_destroy (handle->hashmap); 551 GNUNET_CONTAINER_multihashmap_destroy (handle->hashmap);
552 /* Clear the message queue */
553 if (NULL != handle->transmit_handle)
554 {
555 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->transmit_handle);
556 }
557 head = handle->mq_head;
558 while (NULL != head)
559 {
560 GNUNET_CONTAINER_DLL_remove (handle->mq_head,
561 handle->mq_tail,
562 head);
563 GNUNET_free (head->msg);
564 GNUNET_free (head);
565 head = handle->mq_head;
566 }
567 GNUNET_CLIENT_disconnect (handle->conn);
461 GNUNET_free (handle); 568 GNUNET_free (handle);
462 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__); 569 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
463} 570}
@@ -517,13 +624,7 @@ GNUNET_LOCKMANAGER_acquire_lock (struct GNUNET_LOCKMANAGER_Handle *handle,
517 msg->lock = htonl (lock); 624 msg->lock = htonl (lock);
518 memcpy (&msg[1], r->domain, domain_name_length); 625 memcpy (&msg[1], r->domain, domain_name_length);
519 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n"); 626 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n");
520 r->transmit_handle = 627 queue_message (handle, msg);
521 GNUNET_CLIENT_notify_transmit_ready (r->handle->conn,
522 msg_size,
523 TIMEOUT,
524 GNUNET_YES,
525 &transmit_notify,
526 msg);
527 get_key (r->domain, r->lock, &hash); 628 get_key (r->domain, r->lock, &hash);
528 GNUNET_CONTAINER_multihashmap_put (r->handle->hashmap, 629 GNUNET_CONTAINER_multihashmap_put (r->handle->hashmap,
529 &hash, 630 &hash,
@@ -564,12 +665,7 @@ GNUNET_LOCKMANAGER_cancel_request (struct GNUNET_LOCKMANAGER_LockingRequest
564 msg->header.size = htons (msg_size); 665 msg->header.size = htons (msg_size);
565 msg->lock = htonl (request->lock); 666 msg->lock = htonl (request->lock);
566 memcpy (&msg[1], request->domain, domain_name_length); 667 memcpy (&msg[1], request->domain, domain_name_length);
567 GNUNET_CLIENT_notify_transmit_ready (request->handle->conn, 668 queue_message (request->handle, msg);
568 msg_size,
569 TIMEOUT, /* What if this fails */
570 GNUNET_NO,
571 &transmit_notify,
572 msg);
573 } 669 }
574 get_key (request->domain, request->lock, &hash); 670 get_key (request->domain, request->lock, &hash);
575 GNUNET_assert (GNUNET_YES == 671 GNUNET_assert (GNUNET_YES ==