aboutsummaryrefslogtreecommitdiff
path: root/src/testbed/testbed_api_operations.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/testbed/testbed_api_operations.c')
-rw-r--r--src/testbed/testbed_api_operations.c794
1 files changed, 397 insertions, 397 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 167e1b833..194dc2655 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
@@ -44,8 +44,7 @@
44/** 44/**
45 * An entry in the operation queue 45 * An entry in the operation queue
46 */ 46 */
47struct QueueEntry 47struct QueueEntry {
48{
49 /** 48 /**
50 * The next DLL pointer 49 * The next DLL pointer
51 */ 50 */
@@ -78,8 +77,7 @@ struct OperationQueue;
78/** 77/**
79 * A slot to record time taken by an operation 78 * A slot to record time taken by an operation
80 */ 79 */
81struct TimeSlot 80struct TimeSlot {
82{
83 /** 81 /**
84 * DLL next pointer 82 * DLL next pointer
85 */ 83 */
@@ -115,8 +113,7 @@ struct TimeSlot
115/** 113/**
116 * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE 114 * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
117 */ 115 */
118struct FeedbackCtx 116struct FeedbackCtx {
119{
120 /** 117 /**
121 * Handle for calculating standard deviation 118 * Handle for calculating standard deviation
122 */ 119 */
@@ -159,8 +156,7 @@ struct FeedbackCtx
159 * Queue of operations where we can only support a certain 156 * Queue of operations where we can only support a certain
160 * number of concurrent operations of a particular type. 157 * number of concurrent operations of a particular type.
161 */ 158 */
162struct OperationQueue 159struct OperationQueue {
163{
164 /** 160 /**
165 * DLL head for the wait queue. Operations which are waiting for this 161 * DLL head for the wait queue. Operations which are waiting for this
166 * operation queue are put here 162 * operation queue are put here
@@ -247,8 +243,7 @@ struct OperationQueue
247/** 243/**
248 * Operation state 244 * Operation state
249 */ 245 */
250enum OperationState 246enum OperationState {
251{
252 /** 247 /**
253 * The operation is just created and is in initial state 248 * The operation is just created and is in initial state
254 */ 249 */
@@ -282,8 +277,7 @@ enum OperationState
282/** 277/**
283 * An entry in the ready queue (implemented as DLL) 278 * An entry in the ready queue (implemented as DLL)
284 */ 279 */
285struct ReadyQueueEntry 280struct ReadyQueueEntry {
286{
287 /** 281 /**
288 * next ptr for DLL 282 * next ptr for DLL
289 */ 283 */
@@ -304,8 +298,7 @@ struct ReadyQueueEntry
304/** 298/**
305 * Opaque handle to an abstract operation to be executed by the testing framework. 299 * Opaque handle to an abstract operation to be executed by the testing framework.
306 */ 300 */
307struct GNUNET_TESTBED_Operation 301struct GNUNET_TESTBED_Operation {
308{
309 /** 302 /**
310 * Function to call when we have the resources to begin the operation. 303 * Function to call when we have the resources to begin the operation.
311 */ 304 */
@@ -374,7 +367,6 @@ struct GNUNET_TESTBED_Operation
374 * Is this a failed operation? 367 * Is this a failed operation?
375 */ 368 */
376 int failed; 369 int failed;
377
378}; 370};
379 371
380/** 372/**
@@ -411,17 +403,17 @@ struct GNUNET_SCHEDULER_Task *process_rq_task_id;
411 * @return the timeslot 403 * @return the timeslot
412 */ 404 */
413static void 405static void
414assign_timeslot (struct GNUNET_TESTBED_Operation *op, 406assign_timeslot(struct GNUNET_TESTBED_Operation *op,
415 struct OperationQueue *queue) 407 struct OperationQueue *queue)
416{ 408{
417 struct FeedbackCtx *fctx = queue->fctx; 409 struct FeedbackCtx *fctx = queue->fctx;
418 struct TimeSlot *tslot; 410 struct TimeSlot *tslot;
419 411
420 GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type); 412 GNUNET_assert(OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
421 tslot = fctx->alloc_head; 413 tslot = fctx->alloc_head;
422 GNUNET_assert (NULL != tslot); 414 GNUNET_assert(NULL != tslot);
423 GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot); 415 GNUNET_CONTAINER_DLL_remove(fctx->alloc_head, fctx->alloc_tail, tslot);
424 GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot); 416 GNUNET_CONTAINER_DLL_insert_tail(op->tslots_head, op->tslots_tail, tslot);
425 tslot->op = op; 417 tslot->op = op;
426} 418}
427 419
@@ -434,7 +426,7 @@ assign_timeslot (struct GNUNET_TESTBED_Operation *op,
434 * @param index the index of the entry in the operation's array of queue entries 426 * @param index the index of the entry in the operation's array of queue entries
435 */ 427 */
436static void 428static void
437remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) 429remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index)
438{ 430{
439 struct OperationQueue *opq; 431 struct OperationQueue *opq;
440 struct QueueEntry *entry; 432 struct QueueEntry *entry;
@@ -442,23 +434,27 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
442 opq = op->queues[index]; 434 opq = op->queues[index];
443 entry = op->qentries[index]; 435 entry = op->qentries[index];
444 switch (op->state) 436 switch (op->state)
445 { 437 {
446 case OP_STATE_INIT: 438 case OP_STATE_INIT:
447 GNUNET_assert (0); 439 GNUNET_assert(0);
448 break; 440 break;
449 case OP_STATE_WAITING: 441
450 GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); 442 case OP_STATE_WAITING:
451 break; 443 GNUNET_CONTAINER_DLL_remove(opq->wq_head, opq->wq_tail, entry);
452 case OP_STATE_READY: 444 break;
453 GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry); 445
454 break; 446 case OP_STATE_READY:
455 case OP_STATE_ACTIVE: 447 GNUNET_CONTAINER_DLL_remove(opq->rq_head, opq->rq_tail, entry);
456 GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); 448 break;
457 break; 449
458 case OP_STATE_INACTIVE: 450 case OP_STATE_ACTIVE:
459 GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry); 451 GNUNET_CONTAINER_DLL_remove(opq->aq_head, opq->aq_tail, entry);
460 break; 452 break;
461 } 453
454 case OP_STATE_INACTIVE:
455 GNUNET_CONTAINER_DLL_remove(opq->nq_head, opq->nq_tail, entry);
456 break;
457 }
462} 458}
463 459
464 460
@@ -470,53 +466,57 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
470 * @param state the state the operation should have. It cannot be OP_STATE_INIT 466 * @param state the state the operation should have. It cannot be OP_STATE_INIT
471 */ 467 */
472static void 468static void
473change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) 469change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state)
474{ 470{
475 struct QueueEntry *entry; 471 struct QueueEntry *entry;
476 struct OperationQueue *opq; 472 struct OperationQueue *opq;
477 unsigned int cnt; 473 unsigned int cnt;
478 unsigned int s; 474 unsigned int s;
479 475
480 GNUNET_assert (OP_STATE_INIT != state); 476 GNUNET_assert(OP_STATE_INIT != state);
481 GNUNET_assert (NULL != op->queues); 477 GNUNET_assert(NULL != op->queues);
482 GNUNET_assert (NULL != op->nres); 478 GNUNET_assert(NULL != op->nres);
483 GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries)); 479 GNUNET_assert((OP_STATE_INIT == op->state) || (NULL != op->qentries));
484 GNUNET_assert (op->state != state); 480 GNUNET_assert(op->state != state);
485 for (cnt = 0; cnt < op->nqueues; cnt++) 481 for (cnt = 0; cnt < op->nqueues; cnt++)
486 {
487 if (OP_STATE_INIT == op->state)
488 {
489 entry = GNUNET_new (struct QueueEntry);
490 entry->op = op;
491 entry->nres = op->nres[cnt];
492 s = cnt;
493 GNUNET_array_append (op->qentries, s, entry);
494 }
495 else
496 { 482 {
497 entry = op->qentries[cnt]; 483 if (OP_STATE_INIT == op->state)
498 remove_queue_entry (op, cnt); 484 {
485 entry = GNUNET_new(struct QueueEntry);
486 entry->op = op;
487 entry->nres = op->nres[cnt];
488 s = cnt;
489 GNUNET_array_append(op->qentries, s, entry);
490 }
491 else
492 {
493 entry = op->qentries[cnt];
494 remove_queue_entry(op, cnt);
495 }
496 opq = op->queues[cnt];
497 switch (state)
498 {
499 case OP_STATE_INIT:
500 GNUNET_assert(0);
501 break;
502
503 case OP_STATE_WAITING:
504 GNUNET_CONTAINER_DLL_insert_tail(opq->wq_head, opq->wq_tail, entry);
505 break;
506
507 case OP_STATE_READY:
508 GNUNET_CONTAINER_DLL_insert_tail(opq->rq_head, opq->rq_tail, entry);
509 break;
510
511 case OP_STATE_ACTIVE:
512 GNUNET_CONTAINER_DLL_insert_tail(opq->aq_head, opq->aq_tail, entry);
513 break;
514
515 case OP_STATE_INACTIVE:
516 GNUNET_CONTAINER_DLL_insert_tail(opq->nq_head, opq->nq_tail, entry);
517 break;
518 }
499 } 519 }
500 opq = op->queues[cnt];
501 switch (state)
502 {
503 case OP_STATE_INIT:
504 GNUNET_assert (0);
505 break;
506 case OP_STATE_WAITING:
507 GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
508 break;
509 case OP_STATE_READY:
510 GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
511 break;
512 case OP_STATE_ACTIVE:
513 GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
514 break;
515 case OP_STATE_INACTIVE:
516 GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
517 break;
518 }
519 }
520 op->state = state; 520 op->state = state;
521} 521}
522 522
@@ -528,17 +528,17 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
528 * @param op the operation to be removed 528 * @param op the operation to be removed
529 */ 529 */
530static void 530static void
531rq_remove (struct GNUNET_TESTBED_Operation *op) 531rq_remove(struct GNUNET_TESTBED_Operation *op)
532{ 532{
533 GNUNET_assert (NULL != op->rq_entry); 533 GNUNET_assert(NULL != op->rq_entry);
534 GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry); 534 GNUNET_CONTAINER_DLL_remove(rq_head, rq_tail, op->rq_entry);
535 GNUNET_free (op->rq_entry); 535 GNUNET_free(op->rq_entry);
536 op->rq_entry = NULL; 536 op->rq_entry = NULL;
537 if ( (NULL == rq_head) && (NULL != process_rq_task_id) ) 537 if ((NULL == rq_head) && (NULL != process_rq_task_id))
538 { 538 {
539 GNUNET_SCHEDULER_cancel (process_rq_task_id); 539 GNUNET_SCHEDULER_cancel(process_rq_task_id);
540 process_rq_task_id = NULL; 540 process_rq_task_id = NULL;
541 } 541 }
542} 542}
543 543
544 544
@@ -551,28 +551,28 @@ rq_remove (struct GNUNET_TESTBED_Operation *op)
551 * @param cls NULL 551 * @param cls NULL
552 */ 552 */
553static void 553static void
554process_rq_task (void *cls) 554process_rq_task(void *cls)
555{ 555{
556 struct GNUNET_TESTBED_Operation *op; 556 struct GNUNET_TESTBED_Operation *op;
557 struct OperationQueue *queue; 557 struct OperationQueue *queue;
558 unsigned int cnt; 558 unsigned int cnt;
559 559
560 process_rq_task_id = NULL; 560 process_rq_task_id = NULL;
561 GNUNET_assert (NULL != rq_head); 561 GNUNET_assert(NULL != rq_head);
562 GNUNET_assert (NULL != (op = rq_head->op)); 562 GNUNET_assert(NULL != (op = rq_head->op));
563 rq_remove (op); 563 rq_remove(op);
564 if (NULL != rq_head) 564 if (NULL != rq_head)
565 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); 565 process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL);
566 change_state (op, OP_STATE_ACTIVE); 566 change_state(op, OP_STATE_ACTIVE);
567 for (cnt = 0; cnt < op->nqueues; cnt++) 567 for (cnt = 0; cnt < op->nqueues; cnt++)
568 { 568 {
569 queue = op->queues[cnt]; 569 queue = op->queues[cnt];
570 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) 570 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
571 assign_timeslot (op, queue); 571 assign_timeslot(op, queue);
572 } 572 }
573 op->tstart = GNUNET_TIME_absolute_get (); 573 op->tstart = GNUNET_TIME_absolute_get();
574 if (NULL != op->start) 574 if (NULL != op->start)
575 op->start (op->cb_cls); 575 op->start(op->cb_cls);
576} 576}
577 577
578 578
@@ -582,17 +582,17 @@ process_rq_task (void *cls)
582 * @param op the operation to be queued 582 * @param op the operation to be queued
583 */ 583 */
584static void 584static void
585rq_add (struct GNUNET_TESTBED_Operation *op) 585rq_add(struct GNUNET_TESTBED_Operation *op)
586{ 586{
587 struct ReadyQueueEntry *rq_entry; 587 struct ReadyQueueEntry *rq_entry;
588 588
589 GNUNET_assert (NULL == op->rq_entry); 589 GNUNET_assert(NULL == op->rq_entry);
590 rq_entry = GNUNET_new (struct ReadyQueueEntry); 590 rq_entry = GNUNET_new(struct ReadyQueueEntry);
591 rq_entry->op = op; 591 rq_entry->op = op;
592 GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry); 592 GNUNET_CONTAINER_DLL_insert_tail(rq_head, rq_tail, rq_entry);
593 op->rq_entry = rq_entry; 593 op->rq_entry = rq_entry;
594 if (NULL == process_rq_task_id) 594 if (NULL == process_rq_task_id)
595 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); 595 process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL);
596} 596}
597 597
598 598
@@ -604,12 +604,12 @@ rq_add (struct GNUNET_TESTBED_Operation *op)
604 * otherwise 604 * otherwise
605 */ 605 */
606static int 606static int
607is_queue_empty (struct OperationQueue *opq) 607is_queue_empty(struct OperationQueue *opq)
608{ 608{
609 if ( (NULL != opq->wq_head) 609 if ((NULL != opq->wq_head)
610 || (NULL != opq->rq_head) 610 || (NULL != opq->rq_head)
611 || (NULL != opq->aq_head) 611 || (NULL != opq->aq_head)
612 || (NULL != opq->nq_head) ) 612 || (NULL != opq->nq_head))
613 return GNUNET_NO; 613 return GNUNET_NO;
614 return GNUNET_YES; 614 return GNUNET_YES;
615} 615}
@@ -632,10 +632,10 @@ is_queue_empty (struct OperationQueue *opq)
632 * be set to NULL and 0 respectively. 632 * be set to NULL and 0 respectively.
633 */ 633 */
634static int 634static int
635decide_capacity (struct OperationQueue *opq, 635decide_capacity(struct OperationQueue *opq,
636 struct QueueEntry *entry, 636 struct QueueEntry *entry,
637 struct GNUNET_TESTBED_Operation ***ops_, 637 struct GNUNET_TESTBED_Operation ***ops_,
638 unsigned int *n_ops_) 638 unsigned int *n_ops_)
639{ 639{
640 struct QueueEntry **evict_entries; 640 struct QueueEntry **evict_entries;
641 struct GNUNET_TESTBED_Operation **ops; 641 struct GNUNET_TESTBED_Operation **ops;
@@ -647,53 +647,53 @@ decide_capacity (struct OperationQueue *opq,
647 int deficit; 647 int deficit;
648 int rval; 648 int rval;
649 649
650 GNUNET_assert (NULL != (op = entry->op)); 650 GNUNET_assert(NULL != (op = entry->op));
651 GNUNET_assert (0 < (need = entry->nres)); 651 GNUNET_assert(0 < (need = entry->nres));
652 ops = NULL; 652 ops = NULL;
653 n_ops = 0; 653 n_ops = 0;
654 evict_entries = NULL; 654 evict_entries = NULL;
655 n_evict_entries = 0; 655 n_evict_entries = 0;
656 rval = GNUNET_YES; 656 rval = GNUNET_YES;
657 if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) 657 if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
658 { 658 {
659 GNUNET_assert (NULL != opq->fctx); 659 GNUNET_assert(NULL != opq->fctx);
660 GNUNET_assert (opq->max_active >= opq->overload); 660 GNUNET_assert(opq->max_active >= opq->overload);
661 max = opq->max_active - opq->overload; 661 max = opq->max_active - opq->overload;
662 } 662 }
663 else 663 else
664 max = opq->max_active; 664 max = opq->max_active;
665 if (opq->active > max) 665 if (opq->active > max)
666 { 666 {
667 rval = GNUNET_NO; 667 rval = GNUNET_NO;
668 goto ret; 668 goto ret;
669 } 669 }
670 if ((opq->active + need) <= max) 670 if ((opq->active + need) <= max)
671 goto ret; 671 goto ret;
672 deficit = need - (max - opq->active); 672 deficit = need - (max - opq->active);
673 for (entry = opq->nq_head; 673 for (entry = opq->nq_head;
674 (0 < deficit) && (NULL != entry); 674 (0 < deficit) && (NULL != entry);
675 entry = entry->next) 675 entry = entry->next)
676 { 676 {
677 GNUNET_array_append (evict_entries, n_evict_entries, entry); 677 GNUNET_array_append(evict_entries, n_evict_entries, entry);
678 deficit -= entry->nres; 678 deficit -= entry->nres;
679 } 679 }
680 if (0 < deficit) 680 if (0 < deficit)
681 { 681 {
682 rval = GNUNET_NO; 682 rval = GNUNET_NO;
683 goto ret; 683 goto ret;
684 } 684 }
685 for (n_ops = 0; n_ops < n_evict_entries;) 685 for (n_ops = 0; n_ops < n_evict_entries;)
686 { 686 {
687 op = evict_entries[n_ops]->op; 687 op = evict_entries[n_ops]->op;
688 GNUNET_array_append (ops, n_ops, op); /* increments n-ops */ 688 GNUNET_array_append(ops, n_ops, op); /* increments n-ops */
689 } 689 }
690 690
691 ret: 691ret:
692 GNUNET_free_non_null (evict_entries); 692 GNUNET_free_non_null(evict_entries);
693 if (NULL != ops_) 693 if (NULL != ops_)
694 *ops_ = ops; 694 *ops_ = ops;
695 else 695 else
696 GNUNET_free (ops); 696 GNUNET_free(ops);
697 if (NULL != n_ops_) 697 if (NULL != n_ops_)
698 *n_ops_ = n_ops; 698 *n_ops_ = n_ops;
699 return rval; 699 return rval;
@@ -710,30 +710,30 @@ decide_capacity (struct OperationQueue *opq,
710 * @param n_new the number of operations in new array 710 * @param n_new the number of operations in new array
711 */ 711 */
712static void 712static void
713merge_ops (struct GNUNET_TESTBED_Operation ***old, 713merge_ops(struct GNUNET_TESTBED_Operation ***old,
714 unsigned int *n_old, 714 unsigned int *n_old,
715 struct GNUNET_TESTBED_Operation **new, 715 struct GNUNET_TESTBED_Operation **new,
716 unsigned int n_new) 716 unsigned int n_new)
717{ 717{
718 struct GNUNET_TESTBED_Operation **cur; 718 struct GNUNET_TESTBED_Operation **cur;
719 unsigned int i; 719 unsigned int i;
720 unsigned int j; 720 unsigned int j;
721 unsigned int n_cur; 721 unsigned int n_cur;
722 722
723 GNUNET_assert (NULL != old); 723 GNUNET_assert(NULL != old);
724 n_cur = *n_old; 724 n_cur = *n_old;
725 cur = *old; 725 cur = *old;
726 for (i = 0; i < n_new; i++) 726 for (i = 0; i < n_new; i++)
727 {
728 for (j = 0; j < *n_old; j++)
729 { 727 {
730 if (new[i] == cur[j]) 728 for (j = 0; j < *n_old; j++)
731 break; 729 {
730 if (new[i] == cur[j])
731 break;
732 }
733 if (j < *n_old)
734 continue;
735 GNUNET_array_append(cur, n_cur, new[j]);
732 } 736 }
733 if (j < *n_old)
734 continue;
735 GNUNET_array_append (cur, n_cur, new[j]);
736 }
737 *old = cur; 737 *old = cur;
738 *n_old = n_cur; 738 *n_old = n_cur;
739} 739}
@@ -746,7 +746,7 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old,
746 * @param op the operation 746 * @param op the operation
747 */ 747 */
748static int 748static int
749check_readiness (struct GNUNET_TESTBED_Operation *op) 749check_readiness(struct GNUNET_TESTBED_Operation *op)
750{ 750{
751 struct GNUNET_TESTBED_Operation **evict_ops; 751 struct GNUNET_TESTBED_Operation **evict_ops;
752 struct GNUNET_TESTBED_Operation **ops; 752 struct GNUNET_TESTBED_Operation **ops;
@@ -754,39 +754,39 @@ check_readiness (struct GNUNET_TESTBED_Operation *op)
754 unsigned int n_evict_ops; 754 unsigned int n_evict_ops;
755 unsigned int i; 755 unsigned int i;
756 756
757 GNUNET_assert (NULL == op->rq_entry); 757 GNUNET_assert(NULL == op->rq_entry);
758 GNUNET_assert (OP_STATE_WAITING == op->state); 758 GNUNET_assert(OP_STATE_WAITING == op->state);
759 evict_ops = NULL; 759 evict_ops = NULL;
760 n_evict_ops = 0; 760 n_evict_ops = 0;
761 for (i = 0; i < op->nqueues; i++) 761 for (i = 0; i < op->nqueues; i++)
762 {
763 ops = NULL;
764 n_ops = 0;
765 if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
766 &ops, &n_ops))
767 { 762 {
768 GNUNET_free_non_null (evict_ops); 763 ops = NULL;
769 return GNUNET_NO; 764 n_ops = 0;
765 if (GNUNET_NO == decide_capacity(op->queues[i], op->qentries[i],
766 &ops, &n_ops))
767 {
768 GNUNET_free_non_null(evict_ops);
769 return GNUNET_NO;
770 }
771 if (NULL == ops)
772 continue;
773 merge_ops(&evict_ops, &n_evict_ops, ops, n_ops);
774 GNUNET_free(ops);
770 } 775 }
771 if (NULL == ops)
772 continue;
773 merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
774 GNUNET_free (ops);
775 }
776 if (NULL != evict_ops) 776 if (NULL != evict_ops)
777 { 777 {
778 for (i = 0; i < n_evict_ops; i++) 778 for (i = 0; i < n_evict_ops; i++)
779 GNUNET_TESTBED_operation_release_ (evict_ops[i]); 779 GNUNET_TESTBED_operation_release_(evict_ops[i]);
780 GNUNET_free (evict_ops); 780 GNUNET_free(evict_ops);
781 evict_ops = NULL; 781 evict_ops = NULL;
782 /* Evicting the operations should schedule this operation */ 782 /* Evicting the operations should schedule this operation */
783 GNUNET_assert (OP_STATE_READY == op->state); 783 GNUNET_assert(OP_STATE_READY == op->state);
784 return GNUNET_YES; 784 return GNUNET_YES;
785 } 785 }
786 for (i = 0; i < op->nqueues; i++) 786 for (i = 0; i < op->nqueues; i++)
787 op->queues[i]->active += op->nres[i]; 787 op->queues[i]->active += op->nres[i];
788 change_state (op, OP_STATE_READY); 788 change_state(op, OP_STATE_READY);
789 rq_add (op); 789 rq_add(op);
790 return GNUNET_YES; 790 return GNUNET_YES;
791} 791}
792 792
@@ -797,18 +797,18 @@ check_readiness (struct GNUNET_TESTBED_Operation *op)
797 * @param op the operation to defer 797 * @param op the operation to defer
798 */ 798 */
799static void 799static void
800defer (struct GNUNET_TESTBED_Operation *op) 800defer(struct GNUNET_TESTBED_Operation *op)
801{ 801{
802 unsigned int i; 802 unsigned int i;
803 803
804 GNUNET_assert (OP_STATE_READY == op->state); 804 GNUNET_assert(OP_STATE_READY == op->state);
805 rq_remove (op); 805 rq_remove(op);
806 for (i = 0; i < op->nqueues; i++) 806 for (i = 0; i < op->nqueues; i++)
807 { 807 {
808 GNUNET_assert (op->queues[i]->active >= op->nres[i]); 808 GNUNET_assert(op->queues[i]->active >= op->nres[i]);
809 op->queues[i]->active -= op->nres[i]; 809 op->queues[i]->active -= op->nres[i];
810 } 810 }
811 change_state (op, OP_STATE_WAITING); 811 change_state(op, OP_STATE_WAITING);
812} 812}
813 813
814 814
@@ -820,23 +820,23 @@ defer (struct GNUNET_TESTBED_Operation *op)
820 * @param queue the operation queue 820 * @param queue the operation queue
821 */ 821 */
822static void 822static void
823cleanup_tslots (struct OperationQueue *queue) 823cleanup_tslots(struct OperationQueue *queue)
824{ 824{
825 struct FeedbackCtx *fctx = queue->fctx; 825 struct FeedbackCtx *fctx = queue->fctx;
826 struct TimeSlot *tslot; 826 struct TimeSlot *tslot;
827 struct GNUNET_TESTBED_Operation *op; 827 struct GNUNET_TESTBED_Operation *op;
828 unsigned int cnt; 828 unsigned int cnt;
829 829
830 GNUNET_assert (NULL != fctx); 830 GNUNET_assert(NULL != fctx);
831 for (cnt = 0; cnt < queue->max_active; cnt++) 831 for (cnt = 0; cnt < queue->max_active; cnt++)
832 { 832 {
833 tslot = &fctx->tslots_freeptr[cnt]; 833 tslot = &fctx->tslots_freeptr[cnt];
834 op = tslot->op; 834 op = tslot->op;
835 if (NULL == op) 835 if (NULL == op)
836 continue; 836 continue;
837 GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); 837 GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot);
838 } 838 }
839 GNUNET_free_non_null (fctx->tslots_freeptr); 839 GNUNET_free_non_null(fctx->tslots_freeptr);
840 fctx->tslots_freeptr = NULL; 840 fctx->tslots_freeptr = NULL;
841 fctx->alloc_head = NULL; 841 fctx->alloc_head = NULL;
842 fctx->alloc_tail = NULL; 842 fctx->alloc_tail = NULL;
@@ -854,23 +854,23 @@ cleanup_tslots (struct OperationQueue *queue)
854 * will be selected as n 854 * will be selected as n
855 */ 855 */
856static void 856static void
857adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n) 857adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n)
858{ 858{
859 struct FeedbackCtx *fctx = queue->fctx; 859 struct FeedbackCtx *fctx = queue->fctx;
860 struct TimeSlot *tslot; 860 struct TimeSlot *tslot;
861 unsigned int cnt; 861 unsigned int cnt;
862 862
863 cleanup_tslots (queue); 863 cleanup_tslots(queue);
864 n = GNUNET_MIN (n ,fctx->max_active_bound); 864 n = GNUNET_MIN(n, fctx->max_active_bound);
865 fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot)); 865 fctx->tslots_freeptr = GNUNET_malloc(n * sizeof(struct TimeSlot));
866 fctx->nfailed = 0; 866 fctx->nfailed = 0;
867 for (cnt = 0; cnt < n; cnt++) 867 for (cnt = 0; cnt < n; cnt++)
868 { 868 {
869 tslot = &fctx->tslots_freeptr[cnt]; 869 tslot = &fctx->tslots_freeptr[cnt];
870 tslot->queue = queue; 870 tslot->queue = queue;
871 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot); 871 GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail, tslot);
872 } 872 }
873 GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n); 873 GNUNET_TESTBED_operation_queue_reset_max_active_(queue, n);
874} 874}
875 875
876 876
@@ -881,7 +881,7 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
881 * @param queue the queue 881 * @param queue the queue
882 */ 882 */
883static void 883static void
884adapt_parallelism (struct OperationQueue *queue) 884adapt_parallelism(struct OperationQueue *queue)
885{ 885{
886 struct GNUNET_TIME_Relative avg; 886 struct GNUNET_TIME_Relative avg;
887 struct FeedbackCtx *fctx; 887 struct FeedbackCtx *fctx;
@@ -895,32 +895,32 @@ adapt_parallelism (struct OperationQueue *queue)
895 nvals = 0; 895 nvals = 0;
896 fctx = queue->fctx; 896 fctx = queue->fctx;
897 for (cnt = 0; cnt < queue->max_active; cnt++) 897 for (cnt = 0; cnt < queue->max_active; cnt++)
898 { 898 {
899 tslot = &fctx->tslots_freeptr[cnt]; 899 tslot = &fctx->tslots_freeptr[cnt];
900 avg = GNUNET_TIME_relative_add (avg, tslot->tsum); 900 avg = GNUNET_TIME_relative_add(avg, tslot->tsum);
901 nvals += tslot->nvals; 901 nvals += tslot->nvals;
902 } 902 }
903 GNUNET_assert (nvals >= queue->max_active); 903 GNUNET_assert(nvals >= queue->max_active);
904 GNUNET_assert (fctx->nfailed <= nvals); 904 GNUNET_assert(fctx->nfailed <= nvals);
905 nvals -= fctx->nfailed; 905 nvals -= fctx->nfailed;
906 if (0 == nvals) 906 if (0 == nvals)
907 { 907 {
908 if (1 == queue->max_active) 908 if (1 == queue->max_active)
909 adaptive_queue_set_max_active (queue, 1); 909 adaptive_queue_set_max_active(queue, 1);
910 else 910 else
911 adaptive_queue_set_max_active (queue, queue->max_active / 2); 911 adaptive_queue_set_max_active(queue, queue->max_active / 2);
912 return; 912 return;
913 } 913 }
914 avg = GNUNET_TIME_relative_divide (avg, nvals); 914 avg = GNUNET_TIME_relative_divide(avg, nvals);
915 GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); 915 GNUNET_TESTBED_SD_add_data_(fctx->sd, (unsigned int)avg.rel_value_us);
916 if (GNUNET_SYSERR == 916 if (GNUNET_SYSERR ==
917 GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, 917 GNUNET_TESTBED_SD_deviation_factor_(fctx->sd,
918 (unsigned int) avg.rel_value_us, 918 (unsigned int)avg.rel_value_us,
919 &sd)) 919 &sd))
920 { 920 {
921 adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ 921 adaptive_queue_set_max_active(queue, queue->max_active); /* no change */
922 return; 922 return;
923 } 923 }
924 924
925 parallelism = 0; 925 parallelism = 0;
926 if (-1 == sd) 926 if (-1 == sd)
@@ -931,36 +931,36 @@ adapt_parallelism (struct OperationQueue *queue)
931 parallelism = queue->max_active - 1; 931 parallelism = queue->max_active - 1;
932 if (2 <= sd) 932 if (2 <= sd)
933 parallelism = queue->max_active / 2; 933 parallelism = queue->max_active / 2;
934 parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); 934 parallelism = GNUNET_MAX(parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
935 adaptive_queue_set_max_active (queue, parallelism); 935 adaptive_queue_set_max_active(queue, parallelism);
936 936
937#if 0 937#if 0
938 /* old algorithm */ 938 /* old algorithm */
939 if (sd < 0) 939 if (sd < 0)
940 sd = 0; 940 sd = 0;
941 GNUNET_assert (0 <= sd); 941 GNUNET_assert(0 <= sd);
942 //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); 942 //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
943 if (0 == sd) 943 if (0 == sd)
944 { 944 {
945 adaptive_queue_set_max_active (queue, queue->max_active * 2); 945 adaptive_queue_set_max_active(queue, queue->max_active * 2);
946 return; 946 return;
947 } 947 }
948 if (1 == sd) 948 if (1 == sd)
949 { 949 {
950 adaptive_queue_set_max_active (queue, queue->max_active + 1); 950 adaptive_queue_set_max_active(queue, queue->max_active + 1);
951 return; 951 return;
952 } 952 }
953 if (1 == queue->max_active) 953 if (1 == queue->max_active)
954 { 954 {
955 adaptive_queue_set_max_active (queue, 1); 955 adaptive_queue_set_max_active(queue, 1);
956 return; 956 return;
957 } 957 }
958 if (2 == sd) 958 if (2 == sd)
959 { 959 {
960 adaptive_queue_set_max_active (queue, queue->max_active - 1); 960 adaptive_queue_set_max_active(queue, queue->max_active - 1);
961 return; 961 return;
962 } 962 }
963 adaptive_queue_set_max_active (queue, queue->max_active / 2); 963 adaptive_queue_set_max_active(queue, queue->max_active / 2);
964#endif 964#endif
965} 965}
966 966
@@ -973,7 +973,7 @@ adapt_parallelism (struct OperationQueue *queue)
973 * @param op the operation 973 * @param op the operation
974 */ 974 */
975static void 975static void
976update_tslots (struct GNUNET_TESTBED_Operation *op) 976update_tslots(struct GNUNET_TESTBED_Operation *op)
977{ 977{
978 struct OperationQueue *queue; 978 struct OperationQueue *queue;
979 struct GNUNET_TIME_Relative t; 979 struct GNUNET_TIME_Relative t;
@@ -981,31 +981,31 @@ update_tslots (struct GNUNET_TESTBED_Operation *op)
981 struct FeedbackCtx *fctx; 981 struct FeedbackCtx *fctx;
982 unsigned int i; 982 unsigned int i;
983 983
984 t = GNUNET_TIME_absolute_get_duration (op->tstart); 984 t = GNUNET_TIME_absolute_get_duration(op->tstart);
985 while (NULL != (tslot = op->tslots_head)) /* update time slots */ 985 while (NULL != (tslot = op->tslots_head)) /* update time slots */
986 {
987 queue = tslot->queue;
988 fctx = queue->fctx;
989 GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
990 tslot->op = NULL;
991 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
992 tslot);
993 if (op->failed)
994 { 986 {
995 fctx->nfailed++; 987 queue = tslot->queue;
996 for (i = 0; i < op->nqueues; i++) 988 fctx = queue->fctx;
997 if (queue == op->queues[i]) 989 GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot);
998 break; 990 tslot->op = NULL;
999 GNUNET_assert (i != op->nqueues); 991 GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail,
1000 op->queues[i]->overload += op->nres[i]; 992 tslot);
993 if (op->failed)
994 {
995 fctx->nfailed++;
996 for (i = 0; i < op->nqueues; i++)
997 if (queue == op->queues[i])
998 break;
999 GNUNET_assert(i != op->nqueues);
1000 op->queues[i]->overload += op->nres[i];
1001 }
1002 tslot->tsum = GNUNET_TIME_relative_add(tslot->tsum, t);
1003 if (0 != tslot->nvals++)
1004 continue;
1005 fctx->tslots_filled++;
1006 if (queue->max_active == fctx->tslots_filled)
1007 adapt_parallelism(queue);
1001 } 1008 }
1002 tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
1003 if (0 != tslot->nvals++)
1004 continue;
1005 fctx->tslots_filled++;
1006 if (queue->max_active == fctx->tslots_filled)
1007 adapt_parallelism (queue);
1008 }
1009} 1009}
1010 1010
1011 1011
@@ -1018,12 +1018,12 @@ update_tslots (struct GNUNET_TESTBED_Operation *op)
1018 * @return handle to the operation 1018 * @return handle to the operation
1019 */ 1019 */
1020struct GNUNET_TESTBED_Operation * 1020struct GNUNET_TESTBED_Operation *
1021GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start, 1021GNUNET_TESTBED_operation_create_(void *cls, OperationStart start,
1022 OperationRelease release) 1022 OperationRelease release)
1023{ 1023{
1024 struct GNUNET_TESTBED_Operation *op; 1024 struct GNUNET_TESTBED_Operation *op;
1025 1025
1026 op = GNUNET_new (struct GNUNET_TESTBED_Operation); 1026 op = GNUNET_new(struct GNUNET_TESTBED_Operation);
1027 op->start = start; 1027 op->start = start;
1028 op->state = OP_STATE_INIT; 1028 op->state = OP_STATE_INIT;
1029 op->release = release; 1029 op->release = release;
@@ -1041,26 +1041,26 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
1041 * @return handle to the queue 1041 * @return handle to the queue
1042 */ 1042 */
1043struct OperationQueue * 1043struct OperationQueue *
1044GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, 1044GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type,
1045 unsigned int max_active) 1045 unsigned int max_active)
1046{ 1046{
1047 struct OperationQueue *queue; 1047 struct OperationQueue *queue;
1048 struct FeedbackCtx *fctx; 1048 struct FeedbackCtx *fctx;
1049 1049
1050 queue = GNUNET_new (struct OperationQueue); 1050 queue = GNUNET_new(struct OperationQueue);
1051 queue->type = type; 1051 queue->type = type;
1052 if (OPERATION_QUEUE_TYPE_FIXED == type) 1052 if (OPERATION_QUEUE_TYPE_FIXED == type)
1053 { 1053 {
1054 queue->max_active = max_active; 1054 queue->max_active = max_active;
1055 } 1055 }
1056 else 1056 else
1057 { 1057 {
1058 fctx = GNUNET_new (struct FeedbackCtx); 1058 fctx = GNUNET_new(struct FeedbackCtx);
1059 fctx->max_active_bound = max_active; 1059 fctx->max_active_bound = max_active;
1060 fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY); 1060 fctx->sd = GNUNET_TESTBED_SD_init_(ADAPTIVE_QUEUE_DEFAULT_HISTORY);
1061 queue->fctx = fctx; 1061 queue->fctx = fctx;
1062 adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); 1062 adaptive_queue_set_max_active(queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
1063 } 1063 }
1064 return queue; 1064 return queue;
1065} 1065}
1066 1066
@@ -1071,18 +1071,18 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
1071 * @param queue the operation queue to destroy 1071 * @param queue the operation queue to destroy
1072 */ 1072 */
1073static void 1073static void
1074queue_destroy (struct OperationQueue *queue) 1074queue_destroy(struct OperationQueue *queue)
1075{ 1075{
1076 struct FeedbackCtx *fctx; 1076 struct FeedbackCtx *fctx;
1077 1077
1078 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) 1078 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
1079 { 1079 {
1080 cleanup_tslots (queue); 1080 cleanup_tslots(queue);
1081 fctx = queue->fctx; 1081 fctx = queue->fctx;
1082 GNUNET_TESTBED_SD_destroy_ (fctx->sd); 1082 GNUNET_TESTBED_SD_destroy_(fctx->sd);
1083 GNUNET_free (fctx); 1083 GNUNET_free(fctx);
1084 } 1084 }
1085 GNUNET_free (queue); 1085 GNUNET_free(queue);
1086} 1086}
1087 1087
1088 1088
@@ -1094,16 +1094,16 @@ queue_destroy (struct OperationQueue *queue)
1094 * @param queue queue to destroy 1094 * @param queue queue to destroy
1095 */ 1095 */
1096void 1096void
1097GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) 1097GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue)
1098{ 1098{
1099 if (GNUNET_YES != is_queue_empty (queue)) 1099 if (GNUNET_YES != is_queue_empty(queue))
1100 { 1100 {
1101 GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */ 1101 GNUNET_assert(0 == queue->expired); /* Are you calling twice on same queue? */
1102 queue->expired = 1; 1102 queue->expired = 1;
1103 GNUNET_array_append (expired_opqs, n_expired_opqs, queue); 1103 GNUNET_array_append(expired_opqs, n_expired_opqs, queue);
1104 return; 1104 return;
1105 } 1105 }
1106 queue_destroy (queue); 1106 queue_destroy(queue);
1107} 1107}
1108 1108
1109 1109
@@ -1115,11 +1115,11 @@ GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
1115 * is not empty) 1115 * is not empty)
1116 */ 1116 */
1117int 1117int
1118GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) 1118GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue)
1119{ 1119{
1120 if (GNUNET_NO == is_queue_empty (queue)) 1120 if (GNUNET_NO == is_queue_empty(queue))
1121 return GNUNET_NO; 1121 return GNUNET_NO;
1122 GNUNET_TESTBED_operation_queue_destroy_ (queue); 1122 GNUNET_TESTBED_operation_queue_destroy_(queue);
1123 return GNUNET_YES; 1123 return GNUNET_YES;
1124} 1124}
1125 1125
@@ -1131,19 +1131,19 @@ GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
1131 * @param opq the operation queue 1131 * @param opq the operation queue
1132 */ 1132 */
1133static void 1133static void
1134recheck_waiting (struct OperationQueue *opq) 1134recheck_waiting(struct OperationQueue *opq)
1135{ 1135{
1136 struct QueueEntry *entry; 1136 struct QueueEntry *entry;
1137 struct QueueEntry *entry2; 1137 struct QueueEntry *entry2;
1138 1138
1139 entry = opq->wq_head; 1139 entry = opq->wq_head;
1140 while (NULL != entry) 1140 while (NULL != entry)
1141 { 1141 {
1142 entry2 = entry->next; 1142 entry2 = entry->next;
1143 if (GNUNET_NO == check_readiness (entry->op)) 1143 if (GNUNET_NO == check_readiness(entry->op))
1144 break; 1144 break;
1145 entry = entry2; 1145 entry = entry2;
1146 } 1146 }
1147} 1147}
1148 1148
1149 1149
@@ -1156,17 +1156,17 @@ recheck_waiting (struct OperationQueue *opq)
1156 * @param max_active the new maximum number of active operations 1156 * @param max_active the new maximum number of active operations
1157 */ 1157 */
1158void 1158void
1159GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, 1159GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue,
1160 unsigned int max_active) 1160 unsigned int max_active)
1161{ 1161{
1162 struct QueueEntry *entry; 1162 struct QueueEntry *entry;
1163 1163
1164 queue->max_active = max_active; 1164 queue->max_active = max_active;
1165 queue->overload = 0; 1165 queue->overload = 0;
1166 while ( (queue->active > queue->max_active) 1166 while ((queue->active > queue->max_active)
1167 && (NULL != (entry = queue->rq_head)) ) 1167 && (NULL != (entry = queue->rq_head)))
1168 defer (entry->op); 1168 defer(entry->op);
1169 recheck_waiting (queue); 1169 recheck_waiting(queue);
1170} 1170}
1171 1171
1172 1172
@@ -1182,17 +1182,17 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
1182 * operation. Should be greater than 0. 1182 * operation. Should be greater than 0.
1183 */ 1183 */
1184void 1184void
1185GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, 1185GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue,
1186 struct GNUNET_TESTBED_Operation *op, 1186 struct GNUNET_TESTBED_Operation *op,
1187 unsigned int nres) 1187 unsigned int nres)
1188{ 1188{
1189 unsigned int qsize; 1189 unsigned int qsize;
1190 1190
1191 GNUNET_assert (0 < nres); 1191 GNUNET_assert(0 < nres);
1192 qsize = op->nqueues; 1192 qsize = op->nqueues;
1193 GNUNET_array_append (op->queues, op->nqueues, queue); 1193 GNUNET_array_append(op->queues, op->nqueues, queue);
1194 GNUNET_array_append (op->nres, qsize, nres); 1194 GNUNET_array_append(op->nres, qsize, nres);
1195 GNUNET_assert (qsize == op->nqueues); 1195 GNUNET_assert(qsize == op->nqueues);
1196} 1196}
1197 1197
1198 1198
@@ -1208,10 +1208,10 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
1208 * @param op operation to add to the queue 1208 * @param op operation to add to the queue
1209 */ 1209 */
1210void 1210void
1211GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, 1211GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue,
1212 struct GNUNET_TESTBED_Operation *op) 1212 struct GNUNET_TESTBED_Operation *op)
1213{ 1213{
1214 return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1); 1214 return GNUNET_TESTBED_operation_queue_insert2_(queue, op, 1);
1215} 1215}
1216 1216
1217 1217
@@ -1225,11 +1225,11 @@ GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
1225 * @param op the operation to marks as waiting 1225 * @param op the operation to marks as waiting
1226 */ 1226 */
1227void 1227void
1228GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) 1228GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op)
1229{ 1229{
1230 GNUNET_assert (NULL == op->rq_entry); 1230 GNUNET_assert(NULL == op->rq_entry);
1231 change_state (op, OP_STATE_WAITING); 1231 change_state(op, OP_STATE_WAITING);
1232 (void) check_readiness (op); 1232 (void)check_readiness(op);
1233} 1233}
1234 1234
1235 1235
@@ -1243,24 +1243,24 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
1243 * it as inactive. 1243 * it as inactive.
1244 */ 1244 */
1245void 1245void
1246GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) 1246GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op)
1247{ 1247{
1248 struct OperationQueue **queues; 1248 struct OperationQueue **queues;
1249 size_t ms; 1249 size_t ms;
1250 unsigned int nqueues; 1250 unsigned int nqueues;
1251 unsigned int i; 1251 unsigned int i;
1252 1252
1253 GNUNET_assert (OP_STATE_ACTIVE == op->state); 1253 GNUNET_assert(OP_STATE_ACTIVE == op->state);
1254 change_state (op, OP_STATE_INACTIVE); 1254 change_state(op, OP_STATE_INACTIVE);
1255 nqueues = op->nqueues; 1255 nqueues = op->nqueues;
1256 ms = sizeof (struct OperationQueue *) * nqueues; 1256 ms = sizeof(struct OperationQueue *) * nqueues;
1257 queues = GNUNET_malloc (ms); 1257 queues = GNUNET_malloc(ms);
1258 /* Cloning is needed as the operation be released by waiting operations and 1258 /* Cloning is needed as the operation be released by waiting operations and
1259 hence its nqueues memory ptr will be freed */ 1259 hence its nqueues memory ptr will be freed */
1260 GNUNET_memcpy (queues, op->queues, ms); 1260 GNUNET_memcpy(queues, op->queues, ms);
1261 for (i = 0; i < nqueues; i++) 1261 for (i = 0; i < nqueues; i++)
1262 recheck_waiting (queues[i]); 1262 recheck_waiting(queues[i]);
1263 GNUNET_free (queues); 1263 GNUNET_free(queues);
1264} 1264}
1265 1265
1266 1266
@@ -1272,11 +1272,10 @@ GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
1272 * @param op the operation to be marked as active 1272 * @param op the operation to be marked as active
1273 */ 1273 */
1274void 1274void
1275GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) 1275GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op)
1276{ 1276{
1277 1277 GNUNET_assert(OP_STATE_INACTIVE == op->state);
1278 GNUNET_assert (OP_STATE_INACTIVE == op->state); 1278 change_state(op, OP_STATE_ACTIVE);
1279 change_state (op, OP_STATE_ACTIVE);
1280} 1279}
1281 1280
1282 1281
@@ -1287,54 +1286,56 @@ GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
1287 * @param op operation that finished 1286 * @param op operation that finished
1288 */ 1287 */
1289void 1288void
1290GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) 1289GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op)
1291{ 1290{
1292 struct QueueEntry *entry; 1291 struct QueueEntry *entry;
1293 struct OperationQueue *opq; 1292 struct OperationQueue *opq;
1294 unsigned int i; 1293 unsigned int i;
1295 1294
1296 if (OP_STATE_INIT == op->state) 1295 if (OP_STATE_INIT == op->state)
1297 { 1296 {
1298 GNUNET_free (op); 1297 GNUNET_free(op);
1299 return; 1298 return;
1300 } 1299 }
1301 if (OP_STATE_READY == op->state) 1300 if (OP_STATE_READY == op->state)
1302 rq_remove (op); 1301 rq_remove(op);
1303 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ 1302 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
1304 GNUNET_TESTBED_operation_activate_ (op); 1303 GNUNET_TESTBED_operation_activate_(op);
1305 if (OP_STATE_ACTIVE == op->state) 1304 if (OP_STATE_ACTIVE == op->state)
1306 update_tslots (op); 1305 update_tslots(op);
1307 GNUNET_assert (NULL != op->queues); 1306 GNUNET_assert(NULL != op->queues);
1308 GNUNET_assert (NULL != op->qentries); 1307 GNUNET_assert(NULL != op->qentries);
1309 for (i = 0; i < op->nqueues; i++) 1308 for (i = 0; i < op->nqueues; i++)
1310 {
1311 entry = op->qentries[i];
1312 remove_queue_entry (op, i);
1313 opq = op->queues[i];
1314 switch (op->state)
1315 { 1309 {
1316 case OP_STATE_INIT: 1310 entry = op->qentries[i];
1317 case OP_STATE_INACTIVE: 1311 remove_queue_entry(op, i);
1318 GNUNET_assert (0); 1312 opq = op->queues[i];
1319 break; 1313 switch (op->state)
1320 case OP_STATE_WAITING: 1314 {
1321 break; 1315 case OP_STATE_INIT:
1322 case OP_STATE_ACTIVE: 1316 case OP_STATE_INACTIVE:
1323 case OP_STATE_READY: 1317 GNUNET_assert(0);
1324 GNUNET_assert (0 != opq->active); 1318 break;
1325 GNUNET_assert (opq->active >= entry->nres); 1319
1326 opq->active -= entry->nres; 1320 case OP_STATE_WAITING:
1327 recheck_waiting (opq); 1321 break;
1328 break; 1322
1323 case OP_STATE_ACTIVE:
1324 case OP_STATE_READY:
1325 GNUNET_assert(0 != opq->active);
1326 GNUNET_assert(opq->active >= entry->nres);
1327 opq->active -= entry->nres;
1328 recheck_waiting(opq);
1329 break;
1330 }
1331 GNUNET_free(entry);
1329 } 1332 }
1330 GNUNET_free (entry); 1333 GNUNET_free_non_null(op->qentries);
1331 } 1334 GNUNET_free(op->queues);
1332 GNUNET_free_non_null (op->qentries); 1335 GNUNET_free(op->nres);
1333 GNUNET_free (op->queues);
1334 GNUNET_free (op->nres);
1335 if (NULL != op->release) 1336 if (NULL != op->release)
1336 op->release (op->cb_cls); 1337 op->release(op->cb_cls);
1337 GNUNET_free (op); 1338 GNUNET_free(op);
1338} 1339}
1339 1340
1340 1341
@@ -1344,7 +1345,7 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
1344 * @param op the operation to be marked as failed 1345 * @param op the operation to be marked as failed
1345 */ 1346 */
1346void 1347void
1347GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op) 1348GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op)
1348{ 1349{
1349 op->failed = GNUNET_YES; 1350 op->failed = GNUNET_YES;
1350} 1351}
@@ -1355,24 +1356,23 @@ GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
1355 * operations which are not completed and warn about them. 1356 * operations which are not completed and warn about them.
1356 */ 1357 */
1357void __attribute__ ((destructor)) 1358void __attribute__ ((destructor))
1358GNUNET_TESTBED_operations_fini () 1359GNUNET_TESTBED_operations_fini()
1359{ 1360{
1360 struct OperationQueue *queue; 1361 struct OperationQueue *queue;
1361 unsigned int i; 1362 unsigned int i;
1362 int warn = 0; 1363 int warn = 0;
1363 1364
1364 for (i=0; i < n_expired_opqs; i++) 1365 for (i = 0; i < n_expired_opqs; i++)
1365 { 1366 {
1366 queue = expired_opqs[i]; 1367 queue = expired_opqs[i];
1367 if (GNUNET_NO == is_queue_empty (queue)) 1368 if (GNUNET_NO == is_queue_empty(queue))
1368 warn = 1; 1369 warn = 1;
1369 queue_destroy (queue); 1370 queue_destroy(queue);
1370 } 1371 }
1371 GNUNET_free_non_null (expired_opqs); 1372 GNUNET_free_non_null(expired_opqs);
1372 n_expired_opqs = 0; 1373 n_expired_opqs = 0;
1373 if (warn) 1374 if (warn)
1374 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1375 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1375 "Be disciplined. Some operations were not marked as done.\n"); 1376 "Be disciplined. Some operations were not marked as done.\n");
1376
1377} 1377}
1378/* end of testbed_api_operations.c */ 1378/* end of testbed_api_operations.c */