From d65fc468de7a61f016e476cfe3fc471901f32e8a Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Tue, 9 Apr 2013 17:18:16 +0000 Subject: - support caching through inactive operations --- src/testbed/test_testbed_api_operations.c | 61 +++++++++- src/testbed/testbed_api_operations.c | 185 +++++++++++++++++++++++++++++- src/testbed/testbed_api_operations.h | 24 ++++ 3 files changed, 258 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/testbed/test_testbed_api_operations.c b/src/testbed/test_testbed_api_operations.c index e4d3b72fa..af3e68c20 100644 --- a/src/testbed/test_testbed_api_operations.c +++ b/src/testbed/test_testbed_api_operations.c @@ -85,10 +85,18 @@ struct GNUNET_TESTBED_Operation *op6; /** * This operation is started after op4 is released and should consume 1 resource - * on both queues q1 and q1. It should be started along with op5 and op6 + * on both queues q1 and q1. It should be started along with op5 and op6. It is + * then inactivated when op6 is released. op8's start should release this + * operation implicitly. */ struct GNUNET_TESTBED_Operation *op7; +/** + * This operation is started after op6 is finished in step task. It consumes 2 + * resources on both queues q1 and q1. + */ +struct GNUNET_TESTBED_Operation *op8; + /** * The delay task identifier */ @@ -166,10 +174,25 @@ enum Test */ TEST_OP6_RELEASED, + /** + * op8 has began waiting + */ + TEST_OP8_WAITING, + /** * op7 has released */ - TEST_OP7_RELEASED + TEST_OP7_RELEASED, + + /** + * op8 has started + */ + TEST_OP8_STARTED, + + /** + * op8 has been released + */ + TEST_OP8_RELEASED }; /** @@ -238,6 +261,16 @@ step (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) case TEST_OP4_STARTED: GNUNET_TESTBED_operation_release_ (op4); break; + case TEST_OP6_RELEASED: + op8 = GNUNET_TESTBED_operation_create_ (&op8, &start_cb, &release_cb); + GNUNET_TESTBED_operation_queue_insert2_ (q1, op8, 2); + GNUNET_TESTBED_operation_queue_insert2_ (q2, op8, 2); + result = TEST_OP8_WAITING; + GNUNET_TESTBED_operation_begin_wait_ (op8); + break; + case TEST_OP8_STARTED: + GNUNET_TESTBED_operation_release_ (op8); + break; default: GNUNET_assert (0); } @@ -295,6 +328,11 @@ start_cb (void *cls) } } break; + case TEST_OP7_RELEASED: + GNUNET_assert (&op8 == cls); + result = TEST_OP8_STARTED; + step_task = GNUNET_SCHEDULER_add_now (&step, NULL); + break; default: GNUNET_assert (0); } @@ -356,11 +394,17 @@ release_cb (void *cls) case TEST_OP5_RELEASED: op6 = NULL; result = TEST_OP6_RELEASED; - GNUNET_TESTBED_operation_release_ (op7); + GNUNET_TESTBED_operation_inactivate_ (op7); + step_task = GNUNET_SCHEDULER_add_now (&step, NULL); break; - case TEST_OP6_RELEASED: - result = TEST_OP7_RELEASED; + case TEST_OP8_WAITING: + GNUNET_assert (&op7 == cls); op7 = NULL; + result = TEST_OP7_RELEASED; + break; + case TEST_OP8_STARTED: + result = TEST_OP8_RELEASED; + op8 = NULL; GNUNET_TESTBED_operation_queue_destroy_ (q1); GNUNET_TESTBED_operation_queue_destroy_ (q2); q1 = NULL; @@ -417,11 +461,16 @@ main (int argc, char **argv) GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, "test_testbed_api_operations", "nohelp", options, &run, NULL); - if ((GNUNET_OK != ret) || (TEST_OP7_RELEASED != result)) + if ((GNUNET_OK != ret) || (TEST_OP8_RELEASED != result)) return 1; op1 = NULL; op2 = NULL; op3 = NULL; + op4 = NULL; + op5 = NULL; + op6 = NULL; + op7 = NULL; + op8 = NULL; q1 = NULL; q2 = NULL; return 0; diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index 9948ce357..fc5da29b5 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c @@ -93,6 +93,18 @@ struct OperationQueue */ struct QueueEntry *aq_tail; + /** + * DLL head for the inactive queue. Operations which are inactive and can be + * evicted if the queues it holds are maxed out and another operation begins + * to wait on them. + */ + struct QueueEntry *nq_head; + + /** + * DLL tail for the inactive queue. + */ + struct QueueEntry *nq_tail; + /** * Number of operations that are currently active in this queue. */ @@ -129,7 +141,15 @@ enum OperationState /** * The operation has started */ - OP_STATE_STARTED + OP_STATE_STARTED, + + /** + * The operation is inactive. It still holds resources on the operation + * queues. However, this operation will be evicted when another operation + * requires resources from the maxed out queues this operation is holding + * resources from. + */ + OP_STATE_INACTIVE }; @@ -248,6 +268,9 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) case OP_STATE_STARTED: GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); break; + case OP_STATE_INACTIVE: + GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry); + break; } } @@ -294,6 +317,9 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) case OP_STATE_STARTED: GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); break; + case OP_STATE_INACTIVE: + GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry); + break; } } op->state = state; @@ -372,11 +398,96 @@ is_queue_empty (struct OperationQueue *opq) { if ( (NULL != opq->wq_head) || (NULL != opq->rq_head) - || (NULL != opq->aq_head) ) + || (NULL != opq->aq_head) + || (NULL != opq->nq_head) ) return GNUNET_NO; return GNUNET_YES; } + +int +decide_capacity (struct OperationQueue *opq, + struct QueueEntry *entry, + struct GNUNET_TESTBED_Operation ***ops_, + unsigned int *n_ops_) +{ + struct QueueEntry **evict_entries; + struct GNUNET_TESTBED_Operation **ops; + struct GNUNET_TESTBED_Operation *op; + unsigned int n_ops; + unsigned int n_evict_entries; + unsigned int need; + int deficit; + int rval; + + GNUNET_assert (NULL != (op = entry->op)); + GNUNET_assert (0 < (need = entry->nres)); + GNUNET_assert (opq->active <= opq->max_active); + ops = NULL; + n_ops = 0; + evict_entries = NULL; + n_evict_entries = 0; + rval = GNUNET_OK; + if ((opq->active + need) <= opq->max_active) + goto ret; + deficit = need - (opq->max_active - opq->active); + for (entry = opq->nq_head; + (0 < deficit) && (NULL != entry); + entry = entry->next) + { + GNUNET_array_append (evict_entries, n_evict_entries, entry); + deficit -= entry->nres; + } + if (0 < deficit) + { + rval = GNUNET_NO; + goto ret; + } + for (n_ops = 0; n_ops < n_evict_entries;) + { + op = evict_entries[n_ops]->op; + GNUNET_array_append (ops, n_ops, op); /* increments n-ops */ + } + + ret: + GNUNET_free_non_null (evict_entries); + if (NULL != ops_) *ops_ = ops; + if (NULL != n_ops_) *n_ops_ = n_ops; + return rval; +} + +/* FIXME: improve.. */ +void +merge_ops (struct GNUNET_TESTBED_Operation ***old, + unsigned int *n_old, + struct GNUNET_TESTBED_Operation **new, + unsigned int n_new) +{ + struct GNUNET_TESTBED_Operation **cur; + unsigned int i; + unsigned int j; + unsigned int n_cur; + + GNUNET_assert (NULL != old); + n_cur = *n_old; + cur = *old; + for (i = 0; i < n_new; i++) + { + for (j = 0; j < *n_old; j++) + { + if (new[i] == cur[j]) + break; + } + if (j < *n_old) + continue; + GNUNET_array_append (cur, n_cur, new[j]); + } + *old = cur; + *n_old = n_cur; +} + + + /** * Checks for the readiness of an operation and schedules a operation start task * @@ -385,15 +496,40 @@ is_queue_empty (struct OperationQueue *opq) static void check_readiness (struct GNUNET_TESTBED_Operation *op) { + struct GNUNET_TESTBED_Operation **evict_ops; + struct GNUNET_TESTBED_Operation **ops; + unsigned int n_ops; + unsigned int n_evict_ops; unsigned int i; GNUNET_assert (NULL == op->rq_entry); GNUNET_assert (OP_STATE_WAITING == op->state); + evict_ops = NULL; + n_evict_ops = 0; for (i = 0; i < op->nqueues; i++) { - GNUNET_assert (0 < op->nres[i]); - if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active) + ops = NULL; + n_ops = 0; + if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i], + &ops, &n_ops)) + { + GNUNET_free_non_null (evict_ops); return; + } + if (NULL == ops) + continue; + merge_ops (&evict_ops, &n_evict_ops, ops, n_ops); + GNUNET_free (ops); + } + if (NULL != evict_ops) + { + for (i = 0; i < n_evict_ops; i++) + GNUNET_TESTBED_operation_release_ (evict_ops[i]); + GNUNET_free (evict_ops); + evict_ops = NULL; + /* Evicting the operations should schedule this operation */ + GNUNET_assert (OP_STATE_READY == op->state); + return; } for (i = 0; i < op->nqueues; i++) op->queues[i]->active += op->nres[i]; @@ -593,6 +729,38 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) } +/** + * Marks an active operation as inactive - the operation will be kept in a + * ready-to-be-released state and continues to hold resources until another + * operation contents for them. + * + * @param op the operation to be marked as inactive. The operation start + * callback should have been called before for this operation to mark + * it as inactive. + */ +void +GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) +{ + GNUNET_assert (OP_STATE_STARTED == op->state); + change_state (op, OP_STATE_INACTIVE); +} + + +/** + * Marks and inactive operation as active. This fuction should be called to + * ensure that the oprelease callback will not be called until it is either + * marked as inactive or released. + * + * @param op the operation to be marked as active + */ +void +GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) +{ + GNUNET_assert (OP_STATE_INACTIVE == op->state); + change_state (op, OP_STATE_STARTED); +} + + /** * An operation is 'done' (was cancelled or finished); remove * it from the queues and release associated resources. @@ -613,6 +781,8 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) } if (OP_STATE_READY == op->state) rq_remove (op); + if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ + GNUNET_TESTBED_operation_activate_ (op); GNUNET_assert (NULL != op->queues); GNUNET_assert (NULL != op->qentries); for (i = 0; i < op->nqueues; i++) @@ -621,9 +791,12 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) remove_queue_entry (op, i); opq = op->queues[i]; switch (op->state) - { + { case OP_STATE_INIT: - case OP_STATE_WAITING: + case OP_STATE_INACTIVE: + GNUNET_assert (0); + break; + case OP_STATE_WAITING: break; case OP_STATE_READY: case OP_STATE_STARTED: diff --git a/src/testbed/testbed_api_operations.h b/src/testbed/testbed_api_operations.h index 346fe5b75..10a91fb21 100644 --- a/src/testbed/testbed_api_operations.h +++ b/src/testbed/testbed_api_operations.h @@ -175,5 +175,29 @@ void GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op); +/** + * Marks an active operation as inactive - the operation will be kept in a + * ready-to-be-released state and continues to hold resources until another + * operation contents for them. + * + * @param op the operation to be marked as inactive. The operation start + * callback should have been called before for this operation to mark + * it as inactive. + */ +void +GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op); + + +/** + * Marks and inactive operation as active. This fuction should be called to + * ensure that the oprelease callback will not be called until it is either + * marked as inactive or released. + * + * @param op the operation to be marked as active + */ +void +GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op); + + #endif /* end of testbed_api_operations.h */ -- cgit v1.2.3