From 8c29ba8ee17d8c8d98548b8b64cd6a6605b7b295 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Tue, 9 Apr 2013 09:40:00 +0000 Subject: - no starving --- src/testbed/testbed_api_operations.c | 149 +++++++++++++++++++++++++---------- src/testbed/testbed_api_operations.h | 25 +++--- 2 files changed, 119 insertions(+), 55 deletions(-) (limited to 'src') diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index 515367d8e..5a62cac4e 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c @@ -70,6 +70,17 @@ struct OperationQueue */ struct QueueEntry *tail; + /** + * DLL head for the wait queue. Operations which are waiting for this + * operation queue are put here + */ + struct QueueEntry *wq_head; + + /** + * DLL tail for the wait queue. + */ + struct QueueEntry *wq_tail; + /** * Number of operations that are currently active in this queue. */ @@ -158,6 +169,12 @@ struct GNUNET_TESTBED_Operation */ struct OperationQueue **queues; + /** + * Array of operation queue entries corresponding to this operation in + * operation queues for this operation + */ + struct QueueEntry **qentries; + /** * Array of number of resources an operation need from each queue. The numbers * in this array should correspond to the queues array @@ -265,6 +282,48 @@ rq_add (struct GNUNET_TESTBED_Operation *op) } +void +wq_add (struct GNUNET_TESTBED_Operation *op) +{ + struct QueueEntry *entry; + struct OperationQueue *opq; + unsigned int cnt; + + GNUNET_assert (OP_STATE_WAITING == op->state); + GNUNET_assert (NULL == op->qentries); + for (cnt = 0; cnt < op->nqueues;) + { + opq = op->queues[cnt]; + entry = GNUNET_malloc (sizeof (struct QueueEntry)); + entry->op = op; + entry->nres = op->nres[cnt]; + GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); + GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */ + } +} + + +void +wq_remove (struct GNUNET_TESTBED_Operation *op) +{ + struct QueueEntry *entry; + struct OperationQueue *opq; + unsigned int cnt; + + GNUNET_assert (OP_STATE_WAITING == op->state); + GNUNET_assert (NULL != op->qentries); + for (cnt = 0; cnt < op->nqueues; cnt ++) + { + opq = op->queues[cnt]; + entry = op->qentries[cnt]; + GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); + GNUNET_free (entry); + } + GNUNET_free (op->qentries); + op->qentries = NULL; +} + + /** * Checks for the readiness of an operation and schedules a operation start task * @@ -276,15 +335,17 @@ check_readiness (struct GNUNET_TESTBED_Operation *op) unsigned int i; GNUNET_assert (NULL == op->rq_entry); + GNUNET_assert (OP_STATE_WAITING == op->state); for (i = 0; i < op->nqueues; i++) { GNUNET_assert (0 < op->nres[i]); if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active) return; } + wq_remove (op); for (i = 0; i < op->nqueues; i++) op->queues[i]->active += op->nres[i]; - op->state = OP_STATE_READY; + op->state = OP_STATE_READY; rq_add (op); } @@ -304,6 +365,7 @@ defer (struct GNUNET_TESTBED_Operation *op) for (i = 0; i < op->nqueues; i++) op->queues[i]->active--; op->state = OP_STATE_WAITING; + wq_add (op); } @@ -420,27 +482,27 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, * waiting for the operation to become active. * * @param queue queue to add the operation to - * @param operation operation to add to the queue + * @param op operation to add to the queue * @param nres the number of units of the resources of queue needed by the * operation. Should be greater than 0. */ void GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, - struct GNUNET_TESTBED_Operation - *operation, unsigned int nres) + struct GNUNET_TESTBED_Operation *op, + unsigned int nres) { struct QueueEntry *entry; unsigned int qsize; GNUNET_assert (0 < nres); entry = GNUNET_malloc (sizeof (struct QueueEntry)); - entry->op = operation; + entry->op = op; entry->nres = nres; GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry); - qsize = operation->nqueues; - GNUNET_array_append (operation->queues, operation->nqueues, queue); - GNUNET_array_append (operation->nres, qsize, nres); - GNUNET_assert (qsize == operation->nqueues); + qsize = op->nqueues; + GNUNET_array_append (op->queues, op->nqueues, queue); + GNUNET_array_append (op->nres, qsize, nres); + GNUNET_assert (qsize == op->nqueues); } @@ -453,14 +515,13 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, * requires more than 1 * * @param queue queue to add the operation to - * @param operation operation to add to the queue + * @param op operation to add to the queue */ void GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, - struct GNUNET_TESTBED_Operation - *operation) + struct GNUNET_TESTBED_Operation *op) { - return GNUNET_TESTBED_operation_queue_insert2_ (queue, operation, 1); + return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1); } @@ -471,15 +532,15 @@ GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, * insertions to be made without having the first one instantly trigger the * operation if the first queue has sufficient resources). * - * @param operation the operation to marks as waiting + * @param op the operation to marks as waiting */ void -GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation - *operation) +GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) { - GNUNET_assert (NULL == operation->rq_entry); - operation->state = OP_STATE_WAITING; - check_readiness (operation); + GNUNET_assert (NULL == op->rq_entry); + op->state = OP_STATE_WAITING; + wq_add (op); + check_readiness (op); } @@ -490,22 +551,21 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation * thus scheduling the operation is no longer required. * * @param queue queue to add the operation to - * @param operation operation to add to the queue + * @param op operation to add to the queue */ void GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, struct GNUNET_TESTBED_Operation - *operation) + *op) { struct QueueEntry *entry; - struct QueueEntry *entry2; for (entry = queue->head; NULL != entry; entry = entry->next) - if (entry->op == operation) + if (entry->op == op) break; GNUNET_assert (NULL != entry); GNUNET_assert (0 < entry->nres); - switch (operation->state) + switch (op->state) { case OP_STATE_INIT: case OP_STATE_WAITING: @@ -517,15 +577,12 @@ GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, queue->active -= entry->nres; break; } - entry2 = entry->next; GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry); GNUNET_free (entry); - for (; NULL != entry2; entry2 = entry2->next) - if (OP_STATE_WAITING == entry2->op->state) - break; - if (NULL == entry2) + entry = queue->wq_head; + if (NULL == entry) return; - check_readiness (entry2->op); + check_readiness (entry->op); } @@ -533,22 +590,32 @@ GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, * An operation is 'done' (was cancelled or finished); remove * it from the queues and release associated resources. * - * @param operation operation that finished + * @param op operation that finished */ void -GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *operation) +GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) { unsigned int i; - if (OP_STATE_READY == operation->state) - rq_remove (operation); - for (i = 0; i < operation->nqueues; i++) - GNUNET_TESTBED_operation_queue_remove_ (operation->queues[i], operation); - GNUNET_free (operation->queues); - GNUNET_free (operation->nres); - if (NULL != operation->release) - operation->release (operation->cb_cls); - GNUNET_free (operation); + switch (op->state) + { + case OP_STATE_READY: + rq_remove (op); + break; + case OP_STATE_WAITING: + wq_remove (op); + break; + case OP_STATE_STARTED: + case OP_STATE_INIT: + break; + } + for (i = 0; i < op->nqueues; i++) + GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op); + GNUNET_free (op->queues); + GNUNET_free (op->nres); + if (NULL != op->release) + op->release (op->cb_cls); + GNUNET_free (op); } diff --git a/src/testbed/testbed_api_operations.h b/src/testbed/testbed_api_operations.h index 8ab6944f2..1950cd7d1 100644 --- a/src/testbed/testbed_api_operations.h +++ b/src/testbed/testbed_api_operations.h @@ -89,14 +89,14 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, * waiting for the operation to become active. * * @param queue queue to add the operation to - * @param operation operation to add to the queue + * @param op operation to add to the queue * @param nres the number of units of the resources of queue needed by the * operation. Should be greater than 0. */ void GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, - struct GNUNET_TESTBED_Operation - *operation, unsigned int nres); + struct GNUNET_TESTBED_Operation *op, + unsigned int nres); /** @@ -106,12 +106,11 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, * waiting for the operation to become active. * * @param queue queue to add the operation to - * @param operation operation to add to the queue + * @param op operation to add to the queue */ void GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, - struct GNUNET_TESTBED_Operation - *operation); + struct GNUNET_TESTBED_Operation *op); /** @@ -121,11 +120,10 @@ GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, * insertions to be made without having the first one instantly trigger the * operation if the first queue has sufficient resources). * - * @param operation the operation to marks as waiting + * @param op the operation to marks as waiting */ void -GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation - *operation); +GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op); /** @@ -135,12 +133,11 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation * thus scheduling the operation is no longer required. * * @param queue queue to add the operation to - * @param operation operation to add to the queue + * @param op operation to add to the queue */ void GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, - struct GNUNET_TESTBED_Operation - *operation); + struct GNUNET_TESTBED_Operation *op); @@ -187,10 +184,10 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start, * An operation is 'done' (was cancelled or finished); remove * it from the queues and release associated resources. * - * @param operation operation that finished + * @param op operation that finished */ void -GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *operation); +GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op); #endif -- cgit v1.2.3