diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2013-04-09 13:32:54 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2013-04-09 13:32:54 +0000 |
commit | 34302683763317cd5c7053434038c5ede88ef6ec (patch) | |
tree | ae545a24a0020f1c188993548db2172ce18ea15e | |
parent | 02e5060f18eba19252673555b0c0fbb5d996df1a (diff) | |
download | gnunet-34302683763317cd5c7053434038c5ede88ef6ec.tar.gz gnunet-34302683763317cd5c7053434038c5ede88ef6ec.zip |
- maintain separate queues for operations that are in WAITING, READY, and STARTED states
-rw-r--r-- | src/testbed/test_testbed_api_operations.c | 2 | ||||
-rw-r--r-- | src/testbed/testbed_api_operations.c | 289 | ||||
-rw-r--r-- | src/testbed/testbed_api_operations.h | 15 |
3 files changed, 158 insertions, 148 deletions
diff --git a/src/testbed/test_testbed_api_operations.c b/src/testbed/test_testbed_api_operations.c index 24f23be23..e4d3b72fa 100644 --- a/src/testbed/test_testbed_api_operations.c +++ b/src/testbed/test_testbed_api_operations.c | |||
@@ -60,7 +60,7 @@ struct GNUNET_TESTBED_Operation *op2; | |||
60 | * This operation should go into both queues and should consume 2 units of | 60 | * This operation should go into both queues and should consume 2 units of |
61 | * resources on both queues. Since op2 needs a resource from both queues and is | 61 | * resources on both queues. Since op2 needs a resource from both queues and is |
62 | * queues before this operation, it will be blocked until op2 is released even | 62 | * queues before this operation, it will be blocked until op2 is released even |
63 | * though q1 has | 63 | * though q1 has enough free resources |
64 | */ | 64 | */ |
65 | struct GNUNET_TESTBED_Operation *op3; | 65 | struct GNUNET_TESTBED_Operation *op3; |
66 | 66 | ||
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index 5a62cac4e..9948ce357 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c | |||
@@ -60,16 +60,6 @@ struct QueueEntry | |||
60 | */ | 60 | */ |
61 | struct OperationQueue | 61 | struct OperationQueue |
62 | { | 62 | { |
63 | /** | ||
64 | * The head of the operation queue | ||
65 | */ | ||
66 | struct QueueEntry *head; | ||
67 | |||
68 | /** | ||
69 | * The tail of the operation queue | ||
70 | */ | ||
71 | struct QueueEntry *tail; | ||
72 | |||
73 | /** | 63 | /** |
74 | * DLL head for the wait queue. Operations which are waiting for this | 64 | * DLL head for the wait queue. Operations which are waiting for this |
75 | * operation queue are put here | 65 | * operation queue are put here |
@@ -82,6 +72,28 @@ struct OperationQueue | |||
82 | struct QueueEntry *wq_tail; | 72 | struct QueueEntry *wq_tail; |
83 | 73 | ||
84 | /** | 74 | /** |
75 | * DLL head for the ready queue. Operations which are in this operation queue | ||
76 | * and are in ready state are put here | ||
77 | */ | ||
78 | struct QueueEntry *rq_head; | ||
79 | |||
80 | /** | ||
81 | * DLL tail for the ready queue | ||
82 | */ | ||
83 | struct QueueEntry *rq_tail; | ||
84 | |||
85 | /** | ||
86 | * DLL head for the active queue. Operations which are in this operation | ||
87 | * queue and are currently active are put here | ||
88 | */ | ||
89 | struct QueueEntry *aq_head; | ||
90 | |||
91 | /** | ||
92 | * DLL tail for the active queue. | ||
93 | */ | ||
94 | struct QueueEntry *aq_tail; | ||
95 | |||
96 | /** | ||
85 | * Number of operations that are currently active in this queue. | 97 | * Number of operations that are currently active in this queue. |
86 | */ | 98 | */ |
87 | unsigned int active; | 99 | unsigned int active; |
@@ -214,6 +226,79 @@ struct ReadyQueueEntry *rq_tail; | |||
214 | */ | 226 | */ |
215 | GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id; | 227 | GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id; |
216 | 228 | ||
229 | void | ||
230 | remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) | ||
231 | { | ||
232 | struct OperationQueue *opq; | ||
233 | struct QueueEntry *entry; | ||
234 | |||
235 | opq = op->queues[index]; | ||
236 | entry = op->qentries[index]; | ||
237 | switch (op->state) | ||
238 | { | ||
239 | case OP_STATE_INIT: | ||
240 | GNUNET_assert (0); | ||
241 | break; | ||
242 | case OP_STATE_WAITING: | ||
243 | GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); | ||
244 | break; | ||
245 | case OP_STATE_READY: | ||
246 | GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry); | ||
247 | break; | ||
248 | case OP_STATE_STARTED: | ||
249 | GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); | ||
250 | break; | ||
251 | } | ||
252 | } | ||
253 | |||
254 | void | ||
255 | change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) | ||
256 | { | ||
257 | struct QueueEntry *entry; | ||
258 | struct OperationQueue *opq; | ||
259 | unsigned int cnt; | ||
260 | unsigned int s; | ||
261 | |||
262 | GNUNET_assert (OP_STATE_INIT != state); | ||
263 | GNUNET_assert (NULL != op->queues); | ||
264 | GNUNET_assert (NULL != op->nres); | ||
265 | GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries)); | ||
266 | GNUNET_assert (op->state != state); | ||
267 | for (cnt = 0; cnt < op->nqueues; cnt++) | ||
268 | { | ||
269 | if (OP_STATE_INIT == op->state) | ||
270 | { | ||
271 | entry = GNUNET_malloc (sizeof (struct QueueEntry)); | ||
272 | entry->op = op; | ||
273 | entry->nres = op->nres[cnt]; | ||
274 | s = cnt; | ||
275 | GNUNET_array_append (op->qentries, s, entry); | ||
276 | } | ||
277 | else | ||
278 | { | ||
279 | entry = op->qentries[cnt]; | ||
280 | remove_queue_entry (op, cnt); | ||
281 | } | ||
282 | opq = op->queues[cnt]; | ||
283 | switch (state) | ||
284 | { | ||
285 | case OP_STATE_INIT: | ||
286 | GNUNET_assert (0); | ||
287 | break; | ||
288 | case OP_STATE_WAITING: | ||
289 | GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); | ||
290 | break; | ||
291 | case OP_STATE_READY: | ||
292 | GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry); | ||
293 | break; | ||
294 | case OP_STATE_STARTED: | ||
295 | GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); | ||
296 | break; | ||
297 | } | ||
298 | } | ||
299 | op->state = state; | ||
300 | } | ||
301 | |||
217 | 302 | ||
218 | /** | 303 | /** |
219 | * Removes an operation from the ready queue. Also stops the 'process_rq_task' | 304 | * Removes an operation from the ready queue. Also stops the 'process_rq_task' |
@@ -256,7 +341,7 @@ process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
256 | rq_remove (op); | 341 | rq_remove (op); |
257 | if (NULL != rq_head) | 342 | if (NULL != rq_head) |
258 | process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); | 343 | process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); |
259 | op->state = OP_STATE_STARTED; | 344 | change_state (op, OP_STATE_STARTED); |
260 | if (NULL != op->start) | 345 | if (NULL != op->start) |
261 | op->start (op->cb_cls); | 346 | op->start (op->cb_cls); |
262 | } | 347 | } |
@@ -282,48 +367,16 @@ rq_add (struct GNUNET_TESTBED_Operation *op) | |||
282 | } | 367 | } |
283 | 368 | ||
284 | 369 | ||
285 | void | 370 | static int |
286 | wq_add (struct GNUNET_TESTBED_Operation *op) | 371 | is_queue_empty (struct OperationQueue *opq) |
287 | { | 372 | { |
288 | struct QueueEntry *entry; | 373 | if ( (NULL != opq->wq_head) |
289 | struct OperationQueue *opq; | 374 | || (NULL != opq->rq_head) |
290 | unsigned int cnt; | 375 | || (NULL != opq->aq_head) ) |
291 | 376 | return GNUNET_NO; | |
292 | GNUNET_assert (OP_STATE_WAITING == op->state); | 377 | return GNUNET_YES; |
293 | GNUNET_assert (NULL == op->qentries); | ||
294 | for (cnt = 0; cnt < op->nqueues;) | ||
295 | { | ||
296 | opq = op->queues[cnt]; | ||
297 | entry = GNUNET_malloc (sizeof (struct QueueEntry)); | ||
298 | entry->op = op; | ||
299 | entry->nres = op->nres[cnt]; | ||
300 | GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); | ||
301 | GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */ | ||
302 | } | ||
303 | } | ||
304 | |||
305 | |||
306 | void | ||
307 | wq_remove (struct GNUNET_TESTBED_Operation *op) | ||
308 | { | ||
309 | struct QueueEntry *entry; | ||
310 | struct OperationQueue *opq; | ||
311 | unsigned int cnt; | ||
312 | |||
313 | GNUNET_assert (OP_STATE_WAITING == op->state); | ||
314 | GNUNET_assert (NULL != op->qentries); | ||
315 | for (cnt = 0; cnt < op->nqueues; cnt ++) | ||
316 | { | ||
317 | opq = op->queues[cnt]; | ||
318 | entry = op->qentries[cnt]; | ||
319 | GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); | ||
320 | GNUNET_free (entry); | ||
321 | } | ||
322 | GNUNET_free (op->qentries); | ||
323 | op->qentries = NULL; | ||
324 | } | 378 | } |
325 | 379 | ||
326 | |||
327 | /** | 380 | /** |
328 | * Checks for the readiness of an operation and schedules a operation start task | 381 | * Checks for the readiness of an operation and schedules a operation start task |
329 | * | 382 | * |
@@ -342,10 +395,9 @@ check_readiness (struct GNUNET_TESTBED_Operation *op) | |||
342 | if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active) | 395 | if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active) |
343 | return; | 396 | return; |
344 | } | 397 | } |
345 | wq_remove (op); | ||
346 | for (i = 0; i < op->nqueues; i++) | 398 | for (i = 0; i < op->nqueues; i++) |
347 | op->queues[i]->active += op->nres[i]; | 399 | op->queues[i]->active += op->nres[i]; |
348 | op->state = OP_STATE_READY; | 400 | change_state (op, OP_STATE_READY); |
349 | rq_add (op); | 401 | rq_add (op); |
350 | } | 402 | } |
351 | 403 | ||
@@ -364,8 +416,7 @@ defer (struct GNUNET_TESTBED_Operation *op) | |||
364 | rq_remove (op); | 416 | rq_remove (op); |
365 | for (i = 0; i < op->nqueues; i++) | 417 | for (i = 0; i < op->nqueues; i++) |
366 | op->queues[i]->active--; | 418 | op->queues[i]->active--; |
367 | op->state = OP_STATE_WAITING; | 419 | change_state (op, OP_STATE_WAITING); |
368 | wq_add (op); | ||
369 | } | 420 | } |
370 | 421 | ||
371 | 422 | ||
@@ -419,8 +470,7 @@ GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active) | |||
419 | void | 470 | void |
420 | GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) | 471 | GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) |
421 | { | 472 | { |
422 | GNUNET_break (NULL == queue->head); | 473 | GNUNET_break (GNUNET_YES == is_queue_empty (queue)); |
423 | GNUNET_break (NULL == queue->tail); | ||
424 | GNUNET_free (queue); | 474 | GNUNET_free (queue); |
425 | } | 475 | } |
426 | 476 | ||
@@ -435,13 +485,29 @@ GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) | |||
435 | int | 485 | int |
436 | GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) | 486 | GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) |
437 | { | 487 | { |
438 | if (NULL != queue->head) | 488 | if (GNUNET_NO == is_queue_empty (queue)) |
439 | return GNUNET_NO; | 489 | return GNUNET_NO; |
440 | GNUNET_TESTBED_operation_queue_destroy_ (queue); | 490 | GNUNET_TESTBED_operation_queue_destroy_ (queue); |
441 | return GNUNET_YES; | 491 | return GNUNET_YES; |
442 | } | 492 | } |
443 | 493 | ||
444 | 494 | ||
495 | void | ||
496 | recheck_waiting (struct OperationQueue *opq) | ||
497 | { | ||
498 | struct QueueEntry *entry; | ||
499 | struct QueueEntry *entry2; | ||
500 | |||
501 | entry = opq->wq_head; | ||
502 | while ( (NULL != entry) && (opq->active < opq->max_active) ) | ||
503 | { | ||
504 | entry2 = entry->next; | ||
505 | check_readiness (entry->op); | ||
506 | entry = entry2; | ||
507 | } | ||
508 | } | ||
509 | |||
510 | |||
445 | /** | 511 | /** |
446 | * Function to reset the maximum number of operations in the given queue. If | 512 | * Function to reset the maximum number of operations in the given queue. If |
447 | * max_active is lesser than the number of currently active operations, the | 513 | * max_active is lesser than the number of currently active operations, the |
@@ -457,21 +523,10 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, | |||
457 | struct QueueEntry *entry; | 523 | struct QueueEntry *entry; |
458 | 524 | ||
459 | queue->max_active = max_active; | 525 | queue->max_active = max_active; |
460 | entry = queue->head; | 526 | while ( (queue->active > queue->max_active) |
461 | while ((queue->active > queue->max_active) && (NULL != entry)) | 527 | && (NULL != (entry = queue->rq_head)) ) |
462 | { | 528 | defer (entry->op); |
463 | if (entry->op->state == OP_STATE_READY) | 529 | recheck_waiting (queue); |
464 | defer (entry->op); | ||
465 | entry = entry->next; | ||
466 | } | ||
467 | |||
468 | entry = queue->head; | ||
469 | while ((NULL != entry) && (queue->active < queue->max_active)) | ||
470 | { | ||
471 | if (OP_STATE_WAITING == entry->op->state) | ||
472 | check_readiness (entry->op); | ||
473 | entry = entry->next; | ||
474 | } | ||
475 | } | 530 | } |
476 | 531 | ||
477 | 532 | ||
@@ -491,14 +546,9 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, | |||
491 | struct GNUNET_TESTBED_Operation *op, | 546 | struct GNUNET_TESTBED_Operation *op, |
492 | unsigned int nres) | 547 | unsigned int nres) |
493 | { | 548 | { |
494 | struct QueueEntry *entry; | ||
495 | unsigned int qsize; | 549 | unsigned int qsize; |
496 | 550 | ||
497 | GNUNET_assert (0 < nres); | 551 | GNUNET_assert (0 < nres); |
498 | entry = GNUNET_malloc (sizeof (struct QueueEntry)); | ||
499 | entry->op = op; | ||
500 | entry->nres = nres; | ||
501 | GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry); | ||
502 | qsize = op->nqueues; | 552 | qsize = op->nqueues; |
503 | GNUNET_array_append (op->queues, op->nqueues, queue); | 553 | GNUNET_array_append (op->queues, op->nqueues, queue); |
504 | GNUNET_array_append (op->nres, qsize, nres); | 554 | GNUNET_array_append (op->nres, qsize, nres); |
@@ -538,55 +588,12 @@ void | |||
538 | GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) | 588 | GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) |
539 | { | 589 | { |
540 | GNUNET_assert (NULL == op->rq_entry); | 590 | GNUNET_assert (NULL == op->rq_entry); |
541 | op->state = OP_STATE_WAITING; | 591 | change_state (op, OP_STATE_WAITING); |
542 | wq_add (op); | ||
543 | check_readiness (op); | 592 | check_readiness (op); |
544 | } | 593 | } |
545 | 594 | ||
546 | 595 | ||
547 | /** | 596 | /** |
548 | * Remove an operation from a queue. This can be because the | ||
549 | * oeration was active and has completed (and the resources have | ||
550 | * been released), or because the operation was cancelled and | ||
551 | * thus scheduling the operation is no longer required. | ||
552 | * | ||
553 | * @param queue queue to add the operation to | ||
554 | * @param op operation to add to the queue | ||
555 | */ | ||
556 | void | ||
557 | GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, | ||
558 | struct GNUNET_TESTBED_Operation | ||
559 | *op) | ||
560 | { | ||
561 | struct QueueEntry *entry; | ||
562 | |||
563 | for (entry = queue->head; NULL != entry; entry = entry->next) | ||
564 | if (entry->op == op) | ||
565 | break; | ||
566 | GNUNET_assert (NULL != entry); | ||
567 | GNUNET_assert (0 < entry->nres); | ||
568 | switch (op->state) | ||
569 | { | ||
570 | case OP_STATE_INIT: | ||
571 | case OP_STATE_WAITING: | ||
572 | break; | ||
573 | case OP_STATE_READY: | ||
574 | case OP_STATE_STARTED: | ||
575 | GNUNET_assert (0 != queue->active); | ||
576 | GNUNET_assert (queue->active >= entry->nres); | ||
577 | queue->active -= entry->nres; | ||
578 | break; | ||
579 | } | ||
580 | GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry); | ||
581 | GNUNET_free (entry); | ||
582 | entry = queue->wq_head; | ||
583 | if (NULL == entry) | ||
584 | return; | ||
585 | check_readiness (entry->op); | ||
586 | } | ||
587 | |||
588 | |||
589 | /** | ||
590 | * An operation is 'done' (was cancelled or finished); remove | 597 | * An operation is 'done' (was cancelled or finished); remove |
591 | * it from the queues and release associated resources. | 598 | * it from the queues and release associated resources. |
592 | * | 599 | * |
@@ -595,22 +602,40 @@ GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, | |||
595 | void | 602 | void |
596 | GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) | 603 | GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) |
597 | { | 604 | { |
605 | struct QueueEntry *entry; | ||
606 | struct OperationQueue *opq; | ||
598 | unsigned int i; | 607 | unsigned int i; |
599 | 608 | ||
600 | switch (op->state) | 609 | if (OP_STATE_INIT == op->state) |
601 | { | 610 | { |
602 | case OP_STATE_READY: | 611 | GNUNET_free (op); |
603 | rq_remove (op); | 612 | return; |
604 | break; | ||
605 | case OP_STATE_WAITING: | ||
606 | wq_remove (op); | ||
607 | break; | ||
608 | case OP_STATE_STARTED: | ||
609 | case OP_STATE_INIT: | ||
610 | break; | ||
611 | } | 613 | } |
614 | if (OP_STATE_READY == op->state) | ||
615 | rq_remove (op); | ||
616 | GNUNET_assert (NULL != op->queues); | ||
617 | GNUNET_assert (NULL != op->qentries); | ||
612 | for (i = 0; i < op->nqueues; i++) | 618 | for (i = 0; i < op->nqueues; i++) |
613 | GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op); | 619 | { |
620 | entry = op->qentries[i]; | ||
621 | remove_queue_entry (op, i); | ||
622 | opq = op->queues[i]; | ||
623 | switch (op->state) | ||
624 | { | ||
625 | case OP_STATE_INIT: | ||
626 | case OP_STATE_WAITING: | ||
627 | break; | ||
628 | case OP_STATE_READY: | ||
629 | case OP_STATE_STARTED: | ||
630 | GNUNET_assert (0 != opq->active); | ||
631 | GNUNET_assert (opq->active >= entry->nres); | ||
632 | opq->active -= entry->nres; | ||
633 | recheck_waiting (opq); | ||
634 | break; | ||
635 | } | ||
636 | GNUNET_free (entry); | ||
637 | } | ||
638 | GNUNET_free_non_null (op->qentries); | ||
614 | GNUNET_free (op->queues); | 639 | GNUNET_free (op->queues); |
615 | GNUNET_free (op->nres); | 640 | GNUNET_free (op->nres); |
616 | if (NULL != op->release) | 641 | if (NULL != op->release) |
diff --git a/src/testbed/testbed_api_operations.h b/src/testbed/testbed_api_operations.h index 1950cd7d1..346fe5b75 100644 --- a/src/testbed/testbed_api_operations.h +++ b/src/testbed/testbed_api_operations.h | |||
@@ -127,21 +127,6 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op); | |||
127 | 127 | ||
128 | 128 | ||
129 | /** | 129 | /** |
130 | * Remove an operation from a queue. This can be because the | ||
131 | * oeration was active and has completed (and the resources have | ||
132 | * been released), or because the operation was cancelled and | ||
133 | * thus scheduling the operation is no longer required. | ||
134 | * | ||
135 | * @param queue queue to add the operation to | ||
136 | * @param op operation to add to the queue | ||
137 | */ | ||
138 | void | ||
139 | GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, | ||
140 | struct GNUNET_TESTBED_Operation *op); | ||
141 | |||
142 | |||
143 | |||
144 | /** | ||
145 | * Function to call to start an operation once all | 130 | * Function to call to start an operation once all |
146 | * queues the operation is part of declare that the | 131 | * queues the operation is part of declare that the |
147 | * operation can be activated. | 132 | * operation can be activated. |