aboutsummaryrefslogtreecommitdiff
path: root/src/testbed/testbed_api_operations.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/testbed/testbed_api_operations.c')
-rw-r--r--src/testbed/testbed_api_operations.c185
1 files changed, 179 insertions, 6 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 9948ce357..fc5da29b5 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -94,6 +94,18 @@ struct OperationQueue
94 struct QueueEntry *aq_tail; 94 struct QueueEntry *aq_tail;
95 95
96 /** 96 /**
97 * DLL head for the inactive queue. Operations which are inactive and can be
98 * evicted if the queues it holds are maxed out and another operation begins
99 * to wait on them.
100 */
101 struct QueueEntry *nq_head;
102
103 /**
104 * DLL tail for the inactive queue.
105 */
106 struct QueueEntry *nq_tail;
107
108 /**
97 * Number of operations that are currently active in this queue. 109 * Number of operations that are currently active in this queue.
98 */ 110 */
99 unsigned int active; 111 unsigned int active;
@@ -129,7 +141,15 @@ enum OperationState
129 /** 141 /**
130 * The operation has started 142 * The operation has started
131 */ 143 */
132 OP_STATE_STARTED 144 OP_STATE_STARTED,
145
146 /**
147 * The operation is inactive. It still holds resources on the operation
148 * queues. However, this operation will be evicted when another operation
149 * requires resources from the maxed out queues this operation is holding
150 * resources from.
151 */
152 OP_STATE_INACTIVE
133}; 153};
134 154
135 155
@@ -248,6 +268,9 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
248 case OP_STATE_STARTED: 268 case OP_STATE_STARTED:
249 GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); 269 GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
250 break; 270 break;
271 case OP_STATE_INACTIVE:
272 GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
273 break;
251 } 274 }
252} 275}
253 276
@@ -294,6 +317,9 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
294 case OP_STATE_STARTED: 317 case OP_STATE_STARTED:
295 GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); 318 GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
296 break; 319 break;
320 case OP_STATE_INACTIVE:
321 GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
322 break;
297 } 323 }
298 } 324 }
299 op->state = state; 325 op->state = state;
@@ -372,11 +398,96 @@ is_queue_empty (struct OperationQueue *opq)
372{ 398{
373 if ( (NULL != opq->wq_head) 399 if ( (NULL != opq->wq_head)
374 || (NULL != opq->rq_head) 400 || (NULL != opq->rq_head)
375 || (NULL != opq->aq_head) ) 401 || (NULL != opq->aq_head)
402 || (NULL != opq->nq_head) )
376 return GNUNET_NO; 403 return GNUNET_NO;
377 return GNUNET_YES; 404 return GNUNET_YES;
378} 405}
379 406
407
408int
409decide_capacity (struct OperationQueue *opq,
410 struct QueueEntry *entry,
411 struct GNUNET_TESTBED_Operation ***ops_,
412 unsigned int *n_ops_)
413{
414 struct QueueEntry **evict_entries;
415 struct GNUNET_TESTBED_Operation **ops;
416 struct GNUNET_TESTBED_Operation *op;
417 unsigned int n_ops;
418 unsigned int n_evict_entries;
419 unsigned int need;
420 int deficit;
421 int rval;
422
423 GNUNET_assert (NULL != (op = entry->op));
424 GNUNET_assert (0 < (need = entry->nres));
425 GNUNET_assert (opq->active <= opq->max_active);
426 ops = NULL;
427 n_ops = 0;
428 evict_entries = NULL;
429 n_evict_entries = 0;
430 rval = GNUNET_OK;
431 if ((opq->active + need) <= opq->max_active)
432 goto ret;
433 deficit = need - (opq->max_active - opq->active);
434 for (entry = opq->nq_head;
435 (0 < deficit) && (NULL != entry);
436 entry = entry->next)
437 {
438 GNUNET_array_append (evict_entries, n_evict_entries, entry);
439 deficit -= entry->nres;
440 }
441 if (0 < deficit)
442 {
443 rval = GNUNET_NO;
444 goto ret;
445 }
446 for (n_ops = 0; n_ops < n_evict_entries;)
447 {
448 op = evict_entries[n_ops]->op;
449 GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
450 }
451
452 ret:
453 GNUNET_free_non_null (evict_entries);
454 if (NULL != ops_) *ops_ = ops;
455 if (NULL != n_ops_) *n_ops_ = n_ops;
456 return rval;
457}
458
459/* FIXME: improve.. */
460void
461merge_ops (struct GNUNET_TESTBED_Operation ***old,
462 unsigned int *n_old,
463 struct GNUNET_TESTBED_Operation **new,
464 unsigned int n_new)
465{
466 struct GNUNET_TESTBED_Operation **cur;
467 unsigned int i;
468 unsigned int j;
469 unsigned int n_cur;
470
471 GNUNET_assert (NULL != old);
472 n_cur = *n_old;
473 cur = *old;
474 for (i = 0; i < n_new; i++)
475 {
476 for (j = 0; j < *n_old; j++)
477 {
478 if (new[i] == cur[j])
479 break;
480 }
481 if (j < *n_old)
482 continue;
483 GNUNET_array_append (cur, n_cur, new[j]);
484 }
485 *old = cur;
486 *n_old = n_cur;
487}
488
489
490
380/** 491/**
381 * Checks for the readiness of an operation and schedules a operation start task 492 * Checks for the readiness of an operation and schedules a operation start task
382 * 493 *
@@ -385,15 +496,40 @@ is_queue_empty (struct OperationQueue *opq)
385static void 496static void
386check_readiness (struct GNUNET_TESTBED_Operation *op) 497check_readiness (struct GNUNET_TESTBED_Operation *op)
387{ 498{
499 struct GNUNET_TESTBED_Operation **evict_ops;
500 struct GNUNET_TESTBED_Operation **ops;
501 unsigned int n_ops;
502 unsigned int n_evict_ops;
388 unsigned int i; 503 unsigned int i;
389 504
390 GNUNET_assert (NULL == op->rq_entry); 505 GNUNET_assert (NULL == op->rq_entry);
391 GNUNET_assert (OP_STATE_WAITING == op->state); 506 GNUNET_assert (OP_STATE_WAITING == op->state);
507 evict_ops = NULL;
508 n_evict_ops = 0;
392 for (i = 0; i < op->nqueues; i++) 509 for (i = 0; i < op->nqueues; i++)
393 { 510 {
394 GNUNET_assert (0 < op->nres[i]); 511 ops = NULL;
395 if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active) 512 n_ops = 0;
513 if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
514 &ops, &n_ops))
515 {
516 GNUNET_free_non_null (evict_ops);
396 return; 517 return;
518 }
519 if (NULL == ops)
520 continue;
521 merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
522 GNUNET_free (ops);
523 }
524 if (NULL != evict_ops)
525 {
526 for (i = 0; i < n_evict_ops; i++)
527 GNUNET_TESTBED_operation_release_ (evict_ops[i]);
528 GNUNET_free (evict_ops);
529 evict_ops = NULL;
530 /* Evicting the operations should schedule this operation */
531 GNUNET_assert (OP_STATE_READY == op->state);
532 return;
397 } 533 }
398 for (i = 0; i < op->nqueues; i++) 534 for (i = 0; i < op->nqueues; i++)
399 op->queues[i]->active += op->nres[i]; 535 op->queues[i]->active += op->nres[i];
@@ -594,6 +730,38 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
594 730
595 731
596/** 732/**
733 * Marks an active operation as inactive - the operation will be kept in a
734 * ready-to-be-released state and continues to hold resources until another
735 * operation contents for them.
736 *
737 * @param op the operation to be marked as inactive. The operation start
738 * callback should have been called before for this operation to mark
739 * it as inactive.
740 */
741void
742GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
743{
744 GNUNET_assert (OP_STATE_STARTED == op->state);
745 change_state (op, OP_STATE_INACTIVE);
746}
747
748
749/**
750 * Marks and inactive operation as active. This fuction should be called to
751 * ensure that the oprelease callback will not be called until it is either
752 * marked as inactive or released.
753 *
754 * @param op the operation to be marked as active
755 */
756void
757GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
758{
759 GNUNET_assert (OP_STATE_INACTIVE == op->state);
760 change_state (op, OP_STATE_STARTED);
761}
762
763
764/**
597 * An operation is 'done' (was cancelled or finished); remove 765 * An operation is 'done' (was cancelled or finished); remove
598 * it from the queues and release associated resources. 766 * it from the queues and release associated resources.
599 * 767 *
@@ -613,6 +781,8 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
613 } 781 }
614 if (OP_STATE_READY == op->state) 782 if (OP_STATE_READY == op->state)
615 rq_remove (op); 783 rq_remove (op);
784 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
785 GNUNET_TESTBED_operation_activate_ (op);
616 GNUNET_assert (NULL != op->queues); 786 GNUNET_assert (NULL != op->queues);
617 GNUNET_assert (NULL != op->qentries); 787 GNUNET_assert (NULL != op->qentries);
618 for (i = 0; i < op->nqueues; i++) 788 for (i = 0; i < op->nqueues; i++)
@@ -621,9 +791,12 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
621 remove_queue_entry (op, i); 791 remove_queue_entry (op, i);
622 opq = op->queues[i]; 792 opq = op->queues[i];
623 switch (op->state) 793 switch (op->state)
624 { 794 {
625 case OP_STATE_INIT: 795 case OP_STATE_INIT:
626 case OP_STATE_WAITING: 796 case OP_STATE_INACTIVE:
797 GNUNET_assert (0);
798 break;
799 case OP_STATE_WAITING:
627 break; 800 break;
628 case OP_STATE_READY: 801 case OP_STATE_READY:
629 case OP_STATE_STARTED: 802 case OP_STATE_STARTED: