aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2013-04-09 09:40:00 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2013-04-09 09:40:00 +0000
commit8c29ba8ee17d8c8d98548b8b64cd6a6605b7b295 (patch)
treee1f46cc9add2b4795b2d4afee34021a80507302c
parent02373929074ae77e5cfc895963da2aec5d6abb04 (diff)
downloadgnunet-8c29ba8ee17d8c8d98548b8b64cd6a6605b7b295.tar.gz
gnunet-8c29ba8ee17d8c8d98548b8b64cd6a6605b7b295.zip
- no starving
-rw-r--r--src/testbed/testbed_api_operations.c149
-rw-r--r--src/testbed/testbed_api_operations.h25
2 files changed, 119 insertions, 55 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 515367d8e..5a62cac4e 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -71,6 +71,17 @@ struct OperationQueue
71 struct QueueEntry *tail; 71 struct QueueEntry *tail;
72 72
73 /** 73 /**
74 * DLL head for the wait queue. Operations which are waiting for this
75 * operation queue are put here
76 */
77 struct QueueEntry *wq_head;
78
79 /**
80 * DLL tail for the wait queue.
81 */
82 struct QueueEntry *wq_tail;
83
84 /**
74 * Number of operations that are currently active in this queue. 85 * Number of operations that are currently active in this queue.
75 */ 86 */
76 unsigned int active; 87 unsigned int active;
@@ -159,6 +170,12 @@ struct GNUNET_TESTBED_Operation
159 struct OperationQueue **queues; 170 struct OperationQueue **queues;
160 171
161 /** 172 /**
173 * Array of operation queue entries corresponding to this operation in
174 * operation queues for this operation
175 */
176 struct QueueEntry **qentries;
177
178 /**
162 * Array of number of resources an operation need from each queue. The numbers 179 * Array of number of resources an operation need from each queue. The numbers
163 * in this array should correspond to the queues array 180 * in this array should correspond to the queues array
164 */ 181 */
@@ -265,6 +282,48 @@ rq_add (struct GNUNET_TESTBED_Operation *op)
265} 282}
266 283
267 284
285void
286wq_add (struct GNUNET_TESTBED_Operation *op)
287{
288 struct QueueEntry *entry;
289 struct OperationQueue *opq;
290 unsigned int cnt;
291
292 GNUNET_assert (OP_STATE_WAITING == op->state);
293 GNUNET_assert (NULL == op->qentries);
294 for (cnt = 0; cnt < op->nqueues;)
295 {
296 opq = op->queues[cnt];
297 entry = GNUNET_malloc (sizeof (struct QueueEntry));
298 entry->op = op;
299 entry->nres = op->nres[cnt];
300 GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
301 GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */
302 }
303}
304
305
306void
307wq_remove (struct GNUNET_TESTBED_Operation *op)
308{
309 struct QueueEntry *entry;
310 struct OperationQueue *opq;
311 unsigned int cnt;
312
313 GNUNET_assert (OP_STATE_WAITING == op->state);
314 GNUNET_assert (NULL != op->qentries);
315 for (cnt = 0; cnt < op->nqueues; cnt ++)
316 {
317 opq = op->queues[cnt];
318 entry = op->qentries[cnt];
319 GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
320 GNUNET_free (entry);
321 }
322 GNUNET_free (op->qentries);
323 op->qentries = NULL;
324}
325
326
268/** 327/**
269 * Checks for the readiness of an operation and schedules a operation start task 328 * Checks for the readiness of an operation and schedules a operation start task
270 * 329 *
@@ -276,15 +335,17 @@ check_readiness (struct GNUNET_TESTBED_Operation *op)
276 unsigned int i; 335 unsigned int i;
277 336
278 GNUNET_assert (NULL == op->rq_entry); 337 GNUNET_assert (NULL == op->rq_entry);
338 GNUNET_assert (OP_STATE_WAITING == op->state);
279 for (i = 0; i < op->nqueues; i++) 339 for (i = 0; i < op->nqueues; i++)
280 { 340 {
281 GNUNET_assert (0 < op->nres[i]); 341 GNUNET_assert (0 < op->nres[i]);
282 if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active) 342 if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active)
283 return; 343 return;
284 } 344 }
345 wq_remove (op);
285 for (i = 0; i < op->nqueues; i++) 346 for (i = 0; i < op->nqueues; i++)
286 op->queues[i]->active += op->nres[i]; 347 op->queues[i]->active += op->nres[i];
287 op->state = OP_STATE_READY; 348 op->state = OP_STATE_READY;
288 rq_add (op); 349 rq_add (op);
289} 350}
290 351
@@ -304,6 +365,7 @@ defer (struct GNUNET_TESTBED_Operation *op)
304 for (i = 0; i < op->nqueues; i++) 365 for (i = 0; i < op->nqueues; i++)
305 op->queues[i]->active--; 366 op->queues[i]->active--;
306 op->state = OP_STATE_WAITING; 367 op->state = OP_STATE_WAITING;
368 wq_add (op);
307} 369}
308 370
309 371
@@ -420,27 +482,27 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
420 * waiting for the operation to become active. 482 * waiting for the operation to become active.
421 * 483 *
422 * @param queue queue to add the operation to 484 * @param queue queue to add the operation to
423 * @param operation operation to add to the queue 485 * @param op operation to add to the queue
424 * @param nres the number of units of the resources of queue needed by the 486 * @param nres the number of units of the resources of queue needed by the
425 * operation. Should be greater than 0. 487 * operation. Should be greater than 0.
426 */ 488 */
427void 489void
428GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, 490GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
429 struct GNUNET_TESTBED_Operation 491 struct GNUNET_TESTBED_Operation *op,
430 *operation, unsigned int nres) 492 unsigned int nres)
431{ 493{
432 struct QueueEntry *entry; 494 struct QueueEntry *entry;
433 unsigned int qsize; 495 unsigned int qsize;
434 496
435 GNUNET_assert (0 < nres); 497 GNUNET_assert (0 < nres);
436 entry = GNUNET_malloc (sizeof (struct QueueEntry)); 498 entry = GNUNET_malloc (sizeof (struct QueueEntry));
437 entry->op = operation; 499 entry->op = op;
438 entry->nres = nres; 500 entry->nres = nres;
439 GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry); 501 GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry);
440 qsize = operation->nqueues; 502 qsize = op->nqueues;
441 GNUNET_array_append (operation->queues, operation->nqueues, queue); 503 GNUNET_array_append (op->queues, op->nqueues, queue);
442 GNUNET_array_append (operation->nres, qsize, nres); 504 GNUNET_array_append (op->nres, qsize, nres);
443 GNUNET_assert (qsize == operation->nqueues); 505 GNUNET_assert (qsize == op->nqueues);
444} 506}
445 507
446 508
@@ -453,14 +515,13 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
453 * requires more than 1 515 * requires more than 1
454 * 516 *
455 * @param queue queue to add the operation to 517 * @param queue queue to add the operation to
456 * @param operation operation to add to the queue 518 * @param op operation to add to the queue
457 */ 519 */
458void 520void
459GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, 521GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
460 struct GNUNET_TESTBED_Operation 522 struct GNUNET_TESTBED_Operation *op)
461 *operation)
462{ 523{
463 return GNUNET_TESTBED_operation_queue_insert2_ (queue, operation, 1); 524 return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
464} 525}
465 526
466 527
@@ -471,15 +532,15 @@ GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
471 * insertions to be made without having the first one instantly trigger the 532 * insertions to be made without having the first one instantly trigger the
472 * operation if the first queue has sufficient resources). 533 * operation if the first queue has sufficient resources).
473 * 534 *
474 * @param operation the operation to marks as waiting 535 * @param op the operation to marks as waiting
475 */ 536 */
476void 537void
477GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation 538GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
478 *operation)
479{ 539{
480 GNUNET_assert (NULL == operation->rq_entry); 540 GNUNET_assert (NULL == op->rq_entry);
481 operation->state = OP_STATE_WAITING; 541 op->state = OP_STATE_WAITING;
482 check_readiness (operation); 542 wq_add (op);
543 check_readiness (op);
483} 544}
484 545
485 546
@@ -490,22 +551,21 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation
490 * thus scheduling the operation is no longer required. 551 * thus scheduling the operation is no longer required.
491 * 552 *
492 * @param queue queue to add the operation to 553 * @param queue queue to add the operation to
493 * @param operation operation to add to the queue 554 * @param op operation to add to the queue
494 */ 555 */
495void 556void
496GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, 557GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
497 struct GNUNET_TESTBED_Operation 558 struct GNUNET_TESTBED_Operation
498 *operation) 559 *op)
499{ 560{
500 struct QueueEntry *entry; 561 struct QueueEntry *entry;
501 struct QueueEntry *entry2;
502 562
503 for (entry = queue->head; NULL != entry; entry = entry->next) 563 for (entry = queue->head; NULL != entry; entry = entry->next)
504 if (entry->op == operation) 564 if (entry->op == op)
505 break; 565 break;
506 GNUNET_assert (NULL != entry); 566 GNUNET_assert (NULL != entry);
507 GNUNET_assert (0 < entry->nres); 567 GNUNET_assert (0 < entry->nres);
508 switch (operation->state) 568 switch (op->state)
509 { 569 {
510 case OP_STATE_INIT: 570 case OP_STATE_INIT:
511 case OP_STATE_WAITING: 571 case OP_STATE_WAITING:
@@ -517,15 +577,12 @@ GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
517 queue->active -= entry->nres; 577 queue->active -= entry->nres;
518 break; 578 break;
519 } 579 }
520 entry2 = entry->next;
521 GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry); 580 GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry);
522 GNUNET_free (entry); 581 GNUNET_free (entry);
523 for (; NULL != entry2; entry2 = entry2->next) 582 entry = queue->wq_head;
524 if (OP_STATE_WAITING == entry2->op->state) 583 if (NULL == entry)
525 break;
526 if (NULL == entry2)
527 return; 584 return;
528 check_readiness (entry2->op); 585 check_readiness (entry->op);
529} 586}
530 587
531 588
@@ -533,22 +590,32 @@ GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
533 * An operation is 'done' (was cancelled or finished); remove 590 * An operation is 'done' (was cancelled or finished); remove
534 * it from the queues and release associated resources. 591 * it from the queues and release associated resources.
535 * 592 *
536 * @param operation operation that finished 593 * @param op operation that finished
537 */ 594 */
538void 595void
539GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *operation) 596GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
540{ 597{
541 unsigned int i; 598 unsigned int i;
542 599
543 if (OP_STATE_READY == operation->state) 600 switch (op->state)
544 rq_remove (operation); 601 {
545 for (i = 0; i < operation->nqueues; i++) 602 case OP_STATE_READY:
546 GNUNET_TESTBED_operation_queue_remove_ (operation->queues[i], operation); 603 rq_remove (op);
547 GNUNET_free (operation->queues); 604 break;
548 GNUNET_free (operation->nres); 605 case OP_STATE_WAITING:
549 if (NULL != operation->release) 606 wq_remove (op);
550 operation->release (operation->cb_cls); 607 break;
551 GNUNET_free (operation); 608 case OP_STATE_STARTED:
609 case OP_STATE_INIT:
610 break;
611 }
612 for (i = 0; i < op->nqueues; i++)
613 GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op);
614 GNUNET_free (op->queues);
615 GNUNET_free (op->nres);
616 if (NULL != op->release)
617 op->release (op->cb_cls);
618 GNUNET_free (op);
552} 619}
553 620
554 621
diff --git a/src/testbed/testbed_api_operations.h b/src/testbed/testbed_api_operations.h
index 8ab6944f2..1950cd7d1 100644
--- a/src/testbed/testbed_api_operations.h
+++ b/src/testbed/testbed_api_operations.h
@@ -89,14 +89,14 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
89 * waiting for the operation to become active. 89 * waiting for the operation to become active.
90 * 90 *
91 * @param queue queue to add the operation to 91 * @param queue queue to add the operation to
92 * @param operation operation to add to the queue 92 * @param op operation to add to the queue
93 * @param nres the number of units of the resources of queue needed by the 93 * @param nres the number of units of the resources of queue needed by the
94 * operation. Should be greater than 0. 94 * operation. Should be greater than 0.
95 */ 95 */
96void 96void
97GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, 97GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
98 struct GNUNET_TESTBED_Operation 98 struct GNUNET_TESTBED_Operation *op,
99 *operation, unsigned int nres); 99 unsigned int nres);
100 100
101 101
102/** 102/**
@@ -106,12 +106,11 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
106 * waiting for the operation to become active. 106 * waiting for the operation to become active.
107 * 107 *
108 * @param queue queue to add the operation to 108 * @param queue queue to add the operation to
109 * @param operation operation to add to the queue 109 * @param op operation to add to the queue
110 */ 110 */
111void 111void
112GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, 112GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
113 struct GNUNET_TESTBED_Operation 113 struct GNUNET_TESTBED_Operation *op);
114 *operation);
115 114
116 115
117/** 116/**
@@ -121,11 +120,10 @@ GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
121 * insertions to be made without having the first one instantly trigger the 120 * insertions to be made without having the first one instantly trigger the
122 * operation if the first queue has sufficient resources). 121 * operation if the first queue has sufficient resources).
123 * 122 *
124 * @param operation the operation to marks as waiting 123 * @param op the operation to marks as waiting
125 */ 124 */
126void 125void
127GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation 126GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op);
128 *operation);
129 127
130 128
131/** 129/**
@@ -135,12 +133,11 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation
135 * thus scheduling the operation is no longer required. 133 * thus scheduling the operation is no longer required.
136 * 134 *
137 * @param queue queue to add the operation to 135 * @param queue queue to add the operation to
138 * @param operation operation to add to the queue 136 * @param op operation to add to the queue
139 */ 137 */
140void 138void
141GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, 139GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
142 struct GNUNET_TESTBED_Operation 140 struct GNUNET_TESTBED_Operation *op);
143 *operation);
144 141
145 142
146 143
@@ -187,10 +184,10 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
187 * An operation is 'done' (was cancelled or finished); remove 184 * An operation is 'done' (was cancelled or finished); remove
188 * it from the queues and release associated resources. 185 * it from the queues and release associated resources.
189 * 186 *
190 * @param operation operation that finished 187 * @param op operation that finished
191 */ 188 */
192void 189void
193GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *operation); 190GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op);
194 191
195 192
196#endif 193#endif