aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2013-04-09 13:32:54 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2013-04-09 13:32:54 +0000
commit34302683763317cd5c7053434038c5ede88ef6ec (patch)
treeae545a24a0020f1c188993548db2172ce18ea15e /src
parent02e5060f18eba19252673555b0c0fbb5d996df1a (diff)
downloadgnunet-34302683763317cd5c7053434038c5ede88ef6ec.tar.gz
gnunet-34302683763317cd5c7053434038c5ede88ef6ec.zip
- maintain separate queues for operations that are in WAITING, READY, and STARTED states
Diffstat (limited to 'src')
-rw-r--r--src/testbed/test_testbed_api_operations.c2
-rw-r--r--src/testbed/testbed_api_operations.c289
-rw-r--r--src/testbed/testbed_api_operations.h15
3 files changed, 158 insertions, 148 deletions
diff --git a/src/testbed/test_testbed_api_operations.c b/src/testbed/test_testbed_api_operations.c
index 24f23be23..e4d3b72fa 100644
--- a/src/testbed/test_testbed_api_operations.c
+++ b/src/testbed/test_testbed_api_operations.c
@@ -60,7 +60,7 @@ struct GNUNET_TESTBED_Operation *op2;
60 * This operation should go into both queues and should consume 2 units of 60 * This operation should go into both queues and should consume 2 units of
61 * resources on both queues. Since op2 needs a resource from both queues and is 61 * resources on both queues. Since op2 needs a resource from both queues and is
62 * queues before this operation, it will be blocked until op2 is released even 62 * queues before this operation, it will be blocked until op2 is released even
63 * though q1 has 63 * though q1 has enough free resources
64 */ 64 */
65struct GNUNET_TESTBED_Operation *op3; 65struct GNUNET_TESTBED_Operation *op3;
66 66
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 5a62cac4e..9948ce357 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -60,16 +60,6 @@ struct QueueEntry
60 */ 60 */
61struct OperationQueue 61struct OperationQueue
62{ 62{
63 /**
64 * The head of the operation queue
65 */
66 struct QueueEntry *head;
67
68 /**
69 * The tail of the operation queue
70 */
71 struct QueueEntry *tail;
72
73 /** 63 /**
74 * DLL head for the wait queue. Operations which are waiting for this 64 * DLL head for the wait queue. Operations which are waiting for this
75 * operation queue are put here 65 * operation queue are put here
@@ -82,6 +72,28 @@ struct OperationQueue
82 struct QueueEntry *wq_tail; 72 struct QueueEntry *wq_tail;
83 73
84 /** 74 /**
75 * DLL head for the ready queue. Operations which are in this operation queue
76 * and are in ready state are put here
77 */
78 struct QueueEntry *rq_head;
79
80 /**
81 * DLL tail for the ready queue
82 */
83 struct QueueEntry *rq_tail;
84
85 /**
86 * DLL head for the active queue. Operations which are in this operation
87 * queue and are currently active are put here
88 */
89 struct QueueEntry *aq_head;
90
91 /**
92 * DLL tail for the active queue.
93 */
94 struct QueueEntry *aq_tail;
95
96 /**
85 * Number of operations that are currently active in this queue. 97 * Number of operations that are currently active in this queue.
86 */ 98 */
87 unsigned int active; 99 unsigned int active;
@@ -214,6 +226,79 @@ struct ReadyQueueEntry *rq_tail;
214 */ 226 */
215GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id; 227GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
216 228
229void
230remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
231{
232 struct OperationQueue *opq;
233 struct QueueEntry *entry;
234
235 opq = op->queues[index];
236 entry = op->qentries[index];
237 switch (op->state)
238 {
239 case OP_STATE_INIT:
240 GNUNET_assert (0);
241 break;
242 case OP_STATE_WAITING:
243 GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
244 break;
245 case OP_STATE_READY:
246 GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
247 break;
248 case OP_STATE_STARTED:
249 GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
250 break;
251 }
252}
253
254void
255change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
256{
257 struct QueueEntry *entry;
258 struct OperationQueue *opq;
259 unsigned int cnt;
260 unsigned int s;
261
262 GNUNET_assert (OP_STATE_INIT != state);
263 GNUNET_assert (NULL != op->queues);
264 GNUNET_assert (NULL != op->nres);
265 GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
266 GNUNET_assert (op->state != state);
267 for (cnt = 0; cnt < op->nqueues; cnt++)
268 {
269 if (OP_STATE_INIT == op->state)
270 {
271 entry = GNUNET_malloc (sizeof (struct QueueEntry));
272 entry->op = op;
273 entry->nres = op->nres[cnt];
274 s = cnt;
275 GNUNET_array_append (op->qentries, s, entry);
276 }
277 else
278 {
279 entry = op->qentries[cnt];
280 remove_queue_entry (op, cnt);
281 }
282 opq = op->queues[cnt];
283 switch (state)
284 {
285 case OP_STATE_INIT:
286 GNUNET_assert (0);
287 break;
288 case OP_STATE_WAITING:
289 GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
290 break;
291 case OP_STATE_READY:
292 GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
293 break;
294 case OP_STATE_STARTED:
295 GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
296 break;
297 }
298 }
299 op->state = state;
300}
301
217 302
218/** 303/**
219 * Removes an operation from the ready queue. Also stops the 'process_rq_task' 304 * Removes an operation from the ready queue. Also stops the 'process_rq_task'
@@ -256,7 +341,7 @@ process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
256 rq_remove (op); 341 rq_remove (op);
257 if (NULL != rq_head) 342 if (NULL != rq_head)
258 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); 343 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
259 op->state = OP_STATE_STARTED; 344 change_state (op, OP_STATE_STARTED);
260 if (NULL != op->start) 345 if (NULL != op->start)
261 op->start (op->cb_cls); 346 op->start (op->cb_cls);
262} 347}
@@ -282,48 +367,16 @@ rq_add (struct GNUNET_TESTBED_Operation *op)
282} 367}
283 368
284 369
285void 370static int
286wq_add (struct GNUNET_TESTBED_Operation *op) 371is_queue_empty (struct OperationQueue *opq)
287{ 372{
288 struct QueueEntry *entry; 373 if ( (NULL != opq->wq_head)
289 struct OperationQueue *opq; 374 || (NULL != opq->rq_head)
290 unsigned int cnt; 375 || (NULL != opq->aq_head) )
291 376 return GNUNET_NO;
292 GNUNET_assert (OP_STATE_WAITING == op->state); 377 return GNUNET_YES;
293 GNUNET_assert (NULL == op->qentries);
294 for (cnt = 0; cnt < op->nqueues;)
295 {
296 opq = op->queues[cnt];
297 entry = GNUNET_malloc (sizeof (struct QueueEntry));
298 entry->op = op;
299 entry->nres = op->nres[cnt];
300 GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
301 GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */
302 }
303}
304
305
306void
307wq_remove (struct GNUNET_TESTBED_Operation *op)
308{
309 struct QueueEntry *entry;
310 struct OperationQueue *opq;
311 unsigned int cnt;
312
313 GNUNET_assert (OP_STATE_WAITING == op->state);
314 GNUNET_assert (NULL != op->qentries);
315 for (cnt = 0; cnt < op->nqueues; cnt ++)
316 {
317 opq = op->queues[cnt];
318 entry = op->qentries[cnt];
319 GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
320 GNUNET_free (entry);
321 }
322 GNUNET_free (op->qentries);
323 op->qentries = NULL;
324} 378}
325 379
326
327/** 380/**
328 * Checks for the readiness of an operation and schedules a operation start task 381 * Checks for the readiness of an operation and schedules a operation start task
329 * 382 *
@@ -342,10 +395,9 @@ check_readiness (struct GNUNET_TESTBED_Operation *op)
342 if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active) 395 if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active)
343 return; 396 return;
344 } 397 }
345 wq_remove (op);
346 for (i = 0; i < op->nqueues; i++) 398 for (i = 0; i < op->nqueues; i++)
347 op->queues[i]->active += op->nres[i]; 399 op->queues[i]->active += op->nres[i];
348 op->state = OP_STATE_READY; 400 change_state (op, OP_STATE_READY);
349 rq_add (op); 401 rq_add (op);
350} 402}
351 403
@@ -364,8 +416,7 @@ defer (struct GNUNET_TESTBED_Operation *op)
364 rq_remove (op); 416 rq_remove (op);
365 for (i = 0; i < op->nqueues; i++) 417 for (i = 0; i < op->nqueues; i++)
366 op->queues[i]->active--; 418 op->queues[i]->active--;
367 op->state = OP_STATE_WAITING; 419 change_state (op, OP_STATE_WAITING);
368 wq_add (op);
369} 420}
370 421
371 422
@@ -419,8 +470,7 @@ GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
419void 470void
420GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) 471GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
421{ 472{
422 GNUNET_break (NULL == queue->head); 473 GNUNET_break (GNUNET_YES == is_queue_empty (queue));
423 GNUNET_break (NULL == queue->tail);
424 GNUNET_free (queue); 474 GNUNET_free (queue);
425} 475}
426 476
@@ -435,13 +485,29 @@ GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
435int 485int
436GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) 486GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
437{ 487{
438 if (NULL != queue->head) 488 if (GNUNET_NO == is_queue_empty (queue))
439 return GNUNET_NO; 489 return GNUNET_NO;
440 GNUNET_TESTBED_operation_queue_destroy_ (queue); 490 GNUNET_TESTBED_operation_queue_destroy_ (queue);
441 return GNUNET_YES; 491 return GNUNET_YES;
442} 492}
443 493
444 494
495void
496recheck_waiting (struct OperationQueue *opq)
497{
498 struct QueueEntry *entry;
499 struct QueueEntry *entry2;
500
501 entry = opq->wq_head;
502 while ( (NULL != entry) && (opq->active < opq->max_active) )
503 {
504 entry2 = entry->next;
505 check_readiness (entry->op);
506 entry = entry2;
507 }
508}
509
510
445/** 511/**
446 * Function to reset the maximum number of operations in the given queue. If 512 * Function to reset the maximum number of operations in the given queue. If
447 * max_active is lesser than the number of currently active operations, the 513 * max_active is lesser than the number of currently active operations, the
@@ -457,21 +523,10 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
457 struct QueueEntry *entry; 523 struct QueueEntry *entry;
458 524
459 queue->max_active = max_active; 525 queue->max_active = max_active;
460 entry = queue->head; 526 while ( (queue->active > queue->max_active)
461 while ((queue->active > queue->max_active) && (NULL != entry)) 527 && (NULL != (entry = queue->rq_head)) )
462 { 528 defer (entry->op);
463 if (entry->op->state == OP_STATE_READY) 529 recheck_waiting (queue);
464 defer (entry->op);
465 entry = entry->next;
466 }
467
468 entry = queue->head;
469 while ((NULL != entry) && (queue->active < queue->max_active))
470 {
471 if (OP_STATE_WAITING == entry->op->state)
472 check_readiness (entry->op);
473 entry = entry->next;
474 }
475} 530}
476 531
477 532
@@ -491,14 +546,9 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
491 struct GNUNET_TESTBED_Operation *op, 546 struct GNUNET_TESTBED_Operation *op,
492 unsigned int nres) 547 unsigned int nres)
493{ 548{
494 struct QueueEntry *entry;
495 unsigned int qsize; 549 unsigned int qsize;
496 550
497 GNUNET_assert (0 < nres); 551 GNUNET_assert (0 < nres);
498 entry = GNUNET_malloc (sizeof (struct QueueEntry));
499 entry->op = op;
500 entry->nres = nres;
501 GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry);
502 qsize = op->nqueues; 552 qsize = op->nqueues;
503 GNUNET_array_append (op->queues, op->nqueues, queue); 553 GNUNET_array_append (op->queues, op->nqueues, queue);
504 GNUNET_array_append (op->nres, qsize, nres); 554 GNUNET_array_append (op->nres, qsize, nres);
@@ -538,55 +588,12 @@ void
538GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) 588GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
539{ 589{
540 GNUNET_assert (NULL == op->rq_entry); 590 GNUNET_assert (NULL == op->rq_entry);
541 op->state = OP_STATE_WAITING; 591 change_state (op, OP_STATE_WAITING);
542 wq_add (op);
543 check_readiness (op); 592 check_readiness (op);
544} 593}
545 594
546 595
547/** 596/**
548 * Remove an operation from a queue. This can be because the
549 * oeration was active and has completed (and the resources have
550 * been released), or because the operation was cancelled and
551 * thus scheduling the operation is no longer required.
552 *
553 * @param queue queue to add the operation to
554 * @param op operation to add to the queue
555 */
556void
557GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
558 struct GNUNET_TESTBED_Operation
559 *op)
560{
561 struct QueueEntry *entry;
562
563 for (entry = queue->head; NULL != entry; entry = entry->next)
564 if (entry->op == op)
565 break;
566 GNUNET_assert (NULL != entry);
567 GNUNET_assert (0 < entry->nres);
568 switch (op->state)
569 {
570 case OP_STATE_INIT:
571 case OP_STATE_WAITING:
572 break;
573 case OP_STATE_READY:
574 case OP_STATE_STARTED:
575 GNUNET_assert (0 != queue->active);
576 GNUNET_assert (queue->active >= entry->nres);
577 queue->active -= entry->nres;
578 break;
579 }
580 GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry);
581 GNUNET_free (entry);
582 entry = queue->wq_head;
583 if (NULL == entry)
584 return;
585 check_readiness (entry->op);
586}
587
588
589/**
590 * An operation is 'done' (was cancelled or finished); remove 597 * An operation is 'done' (was cancelled or finished); remove
591 * it from the queues and release associated resources. 598 * it from the queues and release associated resources.
592 * 599 *
@@ -595,22 +602,40 @@ GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
595void 602void
596GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) 603GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
597{ 604{
605 struct QueueEntry *entry;
606 struct OperationQueue *opq;
598 unsigned int i; 607 unsigned int i;
599 608
600 switch (op->state) 609 if (OP_STATE_INIT == op->state)
601 { 610 {
602 case OP_STATE_READY: 611 GNUNET_free (op);
603 rq_remove (op); 612 return;
604 break;
605 case OP_STATE_WAITING:
606 wq_remove (op);
607 break;
608 case OP_STATE_STARTED:
609 case OP_STATE_INIT:
610 break;
611 } 613 }
614 if (OP_STATE_READY == op->state)
615 rq_remove (op);
616 GNUNET_assert (NULL != op->queues);
617 GNUNET_assert (NULL != op->qentries);
612 for (i = 0; i < op->nqueues; i++) 618 for (i = 0; i < op->nqueues; i++)
613 GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op); 619 {
620 entry = op->qentries[i];
621 remove_queue_entry (op, i);
622 opq = op->queues[i];
623 switch (op->state)
624 {
625 case OP_STATE_INIT:
626 case OP_STATE_WAITING:
627 break;
628 case OP_STATE_READY:
629 case OP_STATE_STARTED:
630 GNUNET_assert (0 != opq->active);
631 GNUNET_assert (opq->active >= entry->nres);
632 opq->active -= entry->nres;
633 recheck_waiting (opq);
634 break;
635 }
636 GNUNET_free (entry);
637 }
638 GNUNET_free_non_null (op->qentries);
614 GNUNET_free (op->queues); 639 GNUNET_free (op->queues);
615 GNUNET_free (op->nres); 640 GNUNET_free (op->nres);
616 if (NULL != op->release) 641 if (NULL != op->release)
diff --git a/src/testbed/testbed_api_operations.h b/src/testbed/testbed_api_operations.h
index 1950cd7d1..346fe5b75 100644
--- a/src/testbed/testbed_api_operations.h
+++ b/src/testbed/testbed_api_operations.h
@@ -127,21 +127,6 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op);
127 127
128 128
129/** 129/**
130 * Remove an operation from a queue. This can be because the
131 * oeration was active and has completed (and the resources have
132 * been released), or because the operation was cancelled and
133 * thus scheduling the operation is no longer required.
134 *
135 * @param queue queue to add the operation to
136 * @param op operation to add to the queue
137 */
138void
139GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
140 struct GNUNET_TESTBED_Operation *op);
141
142
143
144/**
145 * Function to call to start an operation once all 130 * Function to call to start an operation once all
146 * queues the operation is part of declare that the 131 * queues the operation is part of declare that the
147 * operation can be activated. 132 * operation can be activated.