summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2013-04-09 17:18:16 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2013-04-09 17:18:16 +0000
commitd65fc468de7a61f016e476cfe3fc471901f32e8a (patch)
tree47376ef0cdfc80af0d2af32e4ac388e511c4686d /src
parentae9a6cf3069a7ec482f2bb940595e24bdc092ca1 (diff)
- support caching through inactive operations
Diffstat (limited to 'src')
-rw-r--r--src/testbed/test_testbed_api_operations.c61
-rw-r--r--src/testbed/testbed_api_operations.c185
-rw-r--r--src/testbed/testbed_api_operations.h24
3 files changed, 258 insertions, 12 deletions
diff --git a/src/testbed/test_testbed_api_operations.c b/src/testbed/test_testbed_api_operations.c
index e4d3b72fa..af3e68c20 100644
--- a/src/testbed/test_testbed_api_operations.c
+++ b/src/testbed/test_testbed_api_operations.c
@@ -85,11 +85,19 @@ struct GNUNET_TESTBED_Operation *op6;
/**
* This operation is started after op4 is released and should consume 1 resource
- * on both queues q1 and q1. It should be started along with op5 and op6
+ * on both queues q1 and q1. It should be started along with op5 and op6. It is
+ * then inactivated when op6 is released. op8's start should release this
+ * operation implicitly.
*/
struct GNUNET_TESTBED_Operation *op7;
/**
+ * This operation is started after op6 is finished in step task. It consumes 2
+ * resources on both queues q1 and q1.
+ */
+struct GNUNET_TESTBED_Operation *op8;
+
+/**
* The delay task identifier
*/
GNUNET_SCHEDULER_TaskIdentifier step_task;
@@ -167,9 +175,24 @@ enum Test
TEST_OP6_RELEASED,
/**
+ * op8 has began waiting
+ */
+ TEST_OP8_WAITING,
+
+ /**
* op7 has released
*/
- TEST_OP7_RELEASED
+ TEST_OP7_RELEASED,
+
+ /**
+ * op8 has started
+ */
+ TEST_OP8_STARTED,
+
+ /**
+ * op8 has been released
+ */
+ TEST_OP8_RELEASED
};
/**
@@ -238,6 +261,16 @@ step (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
case TEST_OP4_STARTED:
GNUNET_TESTBED_operation_release_ (op4);
break;
+ case TEST_OP6_RELEASED:
+ op8 = GNUNET_TESTBED_operation_create_ (&op8, &start_cb, &release_cb);
+ GNUNET_TESTBED_operation_queue_insert2_ (q1, op8, 2);
+ GNUNET_TESTBED_operation_queue_insert2_ (q2, op8, 2);
+ result = TEST_OP8_WAITING;
+ GNUNET_TESTBED_operation_begin_wait_ (op8);
+ break;
+ case TEST_OP8_STARTED:
+ GNUNET_TESTBED_operation_release_ (op8);
+ break;
default:
GNUNET_assert (0);
}
@@ -295,6 +328,11 @@ start_cb (void *cls)
}
}
break;
+ case TEST_OP7_RELEASED:
+ GNUNET_assert (&op8 == cls);
+ result = TEST_OP8_STARTED;
+ step_task = GNUNET_SCHEDULER_add_now (&step, NULL);
+ break;
default:
GNUNET_assert (0);
}
@@ -356,11 +394,17 @@ release_cb (void *cls)
case TEST_OP5_RELEASED:
op6 = NULL;
result = TEST_OP6_RELEASED;
- GNUNET_TESTBED_operation_release_ (op7);
+ GNUNET_TESTBED_operation_inactivate_ (op7);
+ step_task = GNUNET_SCHEDULER_add_now (&step, NULL);
break;
- case TEST_OP6_RELEASED:
- result = TEST_OP7_RELEASED;
+ case TEST_OP8_WAITING:
+ GNUNET_assert (&op7 == cls);
op7 = NULL;
+ result = TEST_OP7_RELEASED;
+ break;
+ case TEST_OP8_STARTED:
+ result = TEST_OP8_RELEASED;
+ op8 = NULL;
GNUNET_TESTBED_operation_queue_destroy_ (q1);
GNUNET_TESTBED_operation_queue_destroy_ (q2);
q1 = NULL;
@@ -417,11 +461,16 @@ main (int argc, char **argv)
GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
"test_testbed_api_operations", "nohelp", options,
&run, NULL);
- if ((GNUNET_OK != ret) || (TEST_OP7_RELEASED != result))
+ if ((GNUNET_OK != ret) || (TEST_OP8_RELEASED != result))
return 1;
op1 = NULL;
op2 = NULL;
op3 = NULL;
+ op4 = NULL;
+ op5 = NULL;
+ op6 = NULL;
+ op7 = NULL;
+ op8 = NULL;
q1 = NULL;
q2 = NULL;
return 0;
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 9948ce357..fc5da29b5 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -94,6 +94,18 @@ struct OperationQueue
struct QueueEntry *aq_tail;
/**
+ * DLL head for the inactive queue. Operations which are inactive and can be
+ * evicted if the queues it holds are maxed out and another operation begins
+ * to wait on them.
+ */
+ struct QueueEntry *nq_head;
+
+ /**
+ * DLL tail for the inactive queue.
+ */
+ struct QueueEntry *nq_tail;
+
+ /**
* Number of operations that are currently active in this queue.
*/
unsigned int active;
@@ -129,7 +141,15 @@ enum OperationState
/**
* The operation has started
*/
- OP_STATE_STARTED
+ OP_STATE_STARTED,
+
+ /**
+ * The operation is inactive. It still holds resources on the operation
+ * queues. However, this operation will be evicted when another operation
+ * requires resources from the maxed out queues this operation is holding
+ * resources from.
+ */
+ OP_STATE_INACTIVE
};
@@ -248,6 +268,9 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
case OP_STATE_STARTED:
GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
break;
+ case OP_STATE_INACTIVE:
+ GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
+ break;
}
}
@@ -294,6 +317,9 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
case OP_STATE_STARTED:
GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
break;
+ case OP_STATE_INACTIVE:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
+ break;
}
}
op->state = state;
@@ -372,11 +398,96 @@ is_queue_empty (struct OperationQueue *opq)
{
if ( (NULL != opq->wq_head)
|| (NULL != opq->rq_head)
- || (NULL != opq->aq_head) )
+ || (NULL != opq->aq_head)
+ || (NULL != opq->nq_head) )
return GNUNET_NO;
return GNUNET_YES;
}
+
+int
+decide_capacity (struct OperationQueue *opq,
+ struct QueueEntry *entry,
+ struct GNUNET_TESTBED_Operation ***ops_,
+ unsigned int *n_ops_)
+{
+ struct QueueEntry **evict_entries;
+ struct GNUNET_TESTBED_Operation **ops;
+ struct GNUNET_TESTBED_Operation *op;
+ unsigned int n_ops;
+ unsigned int n_evict_entries;
+ unsigned int need;
+ int deficit;
+ int rval;
+
+ GNUNET_assert (NULL != (op = entry->op));
+ GNUNET_assert (0 < (need = entry->nres));
+ GNUNET_assert (opq->active <= opq->max_active);
+ ops = NULL;
+ n_ops = 0;
+ evict_entries = NULL;
+ n_evict_entries = 0;
+ rval = GNUNET_OK;
+ if ((opq->active + need) <= opq->max_active)
+ goto ret;
+ deficit = need - (opq->max_active - opq->active);
+ for (entry = opq->nq_head;
+ (0 < deficit) && (NULL != entry);
+ entry = entry->next)
+ {
+ GNUNET_array_append (evict_entries, n_evict_entries, entry);
+ deficit -= entry->nres;
+ }
+ if (0 < deficit)
+ {
+ rval = GNUNET_NO;
+ goto ret;
+ }
+ for (n_ops = 0; n_ops < n_evict_entries;)
+ {
+ op = evict_entries[n_ops]->op;
+ GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
+ }
+
+ ret:
+ GNUNET_free_non_null (evict_entries);
+ if (NULL != ops_) *ops_ = ops;
+ if (NULL != n_ops_) *n_ops_ = n_ops;
+ return rval;
+}
+
+/* FIXME: improve.. */
+void
+merge_ops (struct GNUNET_TESTBED_Operation ***old,
+ unsigned int *n_old,
+ struct GNUNET_TESTBED_Operation **new,
+ unsigned int n_new)
+{
+ struct GNUNET_TESTBED_Operation **cur;
+ unsigned int i;
+ unsigned int j;
+ unsigned int n_cur;
+
+ GNUNET_assert (NULL != old);
+ n_cur = *n_old;
+ cur = *old;
+ for (i = 0; i < n_new; i++)
+ {
+ for (j = 0; j < *n_old; j++)
+ {
+ if (new[i] == cur[j])
+ break;
+ }
+ if (j < *n_old)
+ continue;
+ GNUNET_array_append (cur, n_cur, new[j]);
+ }
+ *old = cur;
+ *n_old = n_cur;
+}
+
+
+
/**
* Checks for the readiness of an operation and schedules a operation start task
*
@@ -385,15 +496,40 @@ is_queue_empty (struct OperationQueue *opq)
static void
check_readiness (struct GNUNET_TESTBED_Operation *op)
{
+ struct GNUNET_TESTBED_Operation **evict_ops;
+ struct GNUNET_TESTBED_Operation **ops;
+ unsigned int n_ops;
+ unsigned int n_evict_ops;
unsigned int i;
GNUNET_assert (NULL == op->rq_entry);
GNUNET_assert (OP_STATE_WAITING == op->state);
+ evict_ops = NULL;
+ n_evict_ops = 0;
for (i = 0; i < op->nqueues; i++)
{
- GNUNET_assert (0 < op->nres[i]);
- if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active)
+ ops = NULL;
+ n_ops = 0;
+ if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
+ &ops, &n_ops))
+ {
+ GNUNET_free_non_null (evict_ops);
return;
+ }
+ if (NULL == ops)
+ continue;
+ merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
+ GNUNET_free (ops);
+ }
+ if (NULL != evict_ops)
+ {
+ for (i = 0; i < n_evict_ops; i++)
+ GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+ GNUNET_free (evict_ops);
+ evict_ops = NULL;
+ /* Evicting the operations should schedule this operation */
+ GNUNET_assert (OP_STATE_READY == op->state);
+ return;
}
for (i = 0; i < op->nqueues; i++)
op->queues[i]->active += op->nres[i];
@@ -594,6 +730,38 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
/**
+ * Marks an active operation as inactive - the operation will be kept in a
+ * ready-to-be-released state and continues to hold resources until another
+ * operation contents for them.
+ *
+ * @param op the operation to be marked as inactive. The operation start
+ * callback should have been called before for this operation to mark
+ * it as inactive.
+ */
+void
+GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
+{
+ GNUNET_assert (OP_STATE_STARTED == op->state);
+ change_state (op, OP_STATE_INACTIVE);
+}
+
+
+/**
+ * Marks and inactive operation as active. This fuction should be called to
+ * ensure that the oprelease callback will not be called until it is either
+ * marked as inactive or released.
+ *
+ * @param op the operation to be marked as active
+ */
+void
+GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
+{
+ GNUNET_assert (OP_STATE_INACTIVE == op->state);
+ change_state (op, OP_STATE_STARTED);
+}
+
+
+/**
* An operation is 'done' (was cancelled or finished); remove
* it from the queues and release associated resources.
*
@@ -613,6 +781,8 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
}
if (OP_STATE_READY == op->state)
rq_remove (op);
+ if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
+ GNUNET_TESTBED_operation_activate_ (op);
GNUNET_assert (NULL != op->queues);
GNUNET_assert (NULL != op->qentries);
for (i = 0; i < op->nqueues; i++)
@@ -621,9 +791,12 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
remove_queue_entry (op, i);
opq = op->queues[i];
switch (op->state)
- {
+ {
case OP_STATE_INIT:
- case OP_STATE_WAITING:
+ case OP_STATE_INACTIVE:
+ GNUNET_assert (0);
+ break;
+ case OP_STATE_WAITING:
break;
case OP_STATE_READY:
case OP_STATE_STARTED:
diff --git a/src/testbed/testbed_api_operations.h b/src/testbed/testbed_api_operations.h
index 346fe5b75..10a91fb21 100644
--- a/src/testbed/testbed_api_operations.h
+++ b/src/testbed/testbed_api_operations.h
@@ -175,5 +175,29 @@ void
GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op);
+/**
+ * Marks an active operation as inactive - the operation will be kept in a
+ * ready-to-be-released state and continues to hold resources until another
+ * operation contents for them.
+ *
+ * @param op the operation to be marked as inactive. The operation start
+ * callback should have been called before for this operation to mark
+ * it as inactive.
+ */
+void
+GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op);
+
+
+/**
+ * Marks and inactive operation as active. This fuction should be called to
+ * ensure that the oprelease callback will not be called until it is either
+ * marked as inactive or released.
+ *
+ * @param op the operation to be marked as active
+ */
+void
+GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op);
+
+
#endif
/* end of testbed_api_operations.h */