aboutsummaryrefslogtreecommitdiff
path: root/src/testbed/testbed_api_operations.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/testbed/testbed_api_operations.c')
-rw-r--r--src/testbed/testbed_api_operations.c770
1 files changed, 389 insertions, 381 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 194dc2655..185f62aa8 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -44,7 +44,8 @@
44/** 44/**
45 * An entry in the operation queue 45 * An entry in the operation queue
46 */ 46 */
47struct QueueEntry { 47struct QueueEntry
48{
48 /** 49 /**
49 * The next DLL pointer 50 * The next DLL pointer
50 */ 51 */
@@ -77,7 +78,8 @@ struct OperationQueue;
77/** 78/**
78 * A slot to record time taken by an operation 79 * A slot to record time taken by an operation
79 */ 80 */
80struct TimeSlot { 81struct TimeSlot
82{
81 /** 83 /**
82 * DLL next pointer 84 * DLL next pointer
83 */ 85 */
@@ -113,7 +115,8 @@ struct TimeSlot {
113/** 115/**
114 * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE 116 * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
115 */ 117 */
116struct FeedbackCtx { 118struct FeedbackCtx
119{
117 /** 120 /**
118 * Handle for calculating standard deviation 121 * Handle for calculating standard deviation
119 */ 122 */
@@ -156,7 +159,8 @@ struct FeedbackCtx {
156 * Queue of operations where we can only support a certain 159 * Queue of operations where we can only support a certain
157 * number of concurrent operations of a particular type. 160 * number of concurrent operations of a particular type.
158 */ 161 */
159struct OperationQueue { 162struct OperationQueue
163{
160 /** 164 /**
161 * DLL head for the wait queue. Operations which are waiting for this 165 * DLL head for the wait queue. Operations which are waiting for this
162 * operation queue are put here 166 * operation queue are put here
@@ -243,7 +247,8 @@ struct OperationQueue {
243/** 247/**
244 * Operation state 248 * Operation state
245 */ 249 */
246enum OperationState { 250enum OperationState
251{
247 /** 252 /**
248 * The operation is just created and is in initial state 253 * The operation is just created and is in initial state
249 */ 254 */
@@ -277,7 +282,8 @@ enum OperationState {
277/** 282/**
278 * An entry in the ready queue (implemented as DLL) 283 * An entry in the ready queue (implemented as DLL)
279 */ 284 */
280struct ReadyQueueEntry { 285struct ReadyQueueEntry
286{
281 /** 287 /**
282 * next ptr for DLL 288 * next ptr for DLL
283 */ 289 */
@@ -298,7 +304,8 @@ struct ReadyQueueEntry {
298/** 304/**
299 * Opaque handle to an abstract operation to be executed by the testing framework. 305 * Opaque handle to an abstract operation to be executed by the testing framework.
300 */ 306 */
301struct GNUNET_TESTBED_Operation { 307struct GNUNET_TESTBED_Operation
308{
302 /** 309 /**
303 * Function to call when we have the resources to begin the operation. 310 * Function to call when we have the resources to begin the operation.
304 */ 311 */
@@ -403,17 +410,17 @@ struct GNUNET_SCHEDULER_Task *process_rq_task_id;
403 * @return the timeslot 410 * @return the timeslot
404 */ 411 */
405static void 412static void
406assign_timeslot(struct GNUNET_TESTBED_Operation *op, 413assign_timeslot (struct GNUNET_TESTBED_Operation *op,
407 struct OperationQueue *queue) 414 struct OperationQueue *queue)
408{ 415{
409 struct FeedbackCtx *fctx = queue->fctx; 416 struct FeedbackCtx *fctx = queue->fctx;
410 struct TimeSlot *tslot; 417 struct TimeSlot *tslot;
411 418
412 GNUNET_assert(OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type); 419 GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
413 tslot = fctx->alloc_head; 420 tslot = fctx->alloc_head;
414 GNUNET_assert(NULL != tslot); 421 GNUNET_assert (NULL != tslot);
415 GNUNET_CONTAINER_DLL_remove(fctx->alloc_head, fctx->alloc_tail, tslot); 422 GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
416 GNUNET_CONTAINER_DLL_insert_tail(op->tslots_head, op->tslots_tail, tslot); 423 GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
417 tslot->op = op; 424 tslot->op = op;
418} 425}
419 426
@@ -426,7 +433,7 @@ assign_timeslot(struct GNUNET_TESTBED_Operation *op,
426 * @param index the index of the entry in the operation's array of queue entries 433 * @param index the index of the entry in the operation's array of queue entries
427 */ 434 */
428static void 435static void
429remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index) 436remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
430{ 437{
431 struct OperationQueue *opq; 438 struct OperationQueue *opq;
432 struct QueueEntry *entry; 439 struct QueueEntry *entry;
@@ -434,27 +441,27 @@ remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index)
434 opq = op->queues[index]; 441 opq = op->queues[index];
435 entry = op->qentries[index]; 442 entry = op->qentries[index];
436 switch (op->state) 443 switch (op->state)
437 { 444 {
438 case OP_STATE_INIT: 445 case OP_STATE_INIT:
439 GNUNET_assert(0); 446 GNUNET_assert (0);
440 break; 447 break;
441 448
442 case OP_STATE_WAITING: 449 case OP_STATE_WAITING:
443 GNUNET_CONTAINER_DLL_remove(opq->wq_head, opq->wq_tail, entry); 450 GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
444 break; 451 break;
445 452
446 case OP_STATE_READY: 453 case OP_STATE_READY:
447 GNUNET_CONTAINER_DLL_remove(opq->rq_head, opq->rq_tail, entry); 454 GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
448 break; 455 break;
449 456
450 case OP_STATE_ACTIVE: 457 case OP_STATE_ACTIVE:
451 GNUNET_CONTAINER_DLL_remove(opq->aq_head, opq->aq_tail, entry); 458 GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
452 break; 459 break;
453 460
454 case OP_STATE_INACTIVE: 461 case OP_STATE_INACTIVE:
455 GNUNET_CONTAINER_DLL_remove(opq->nq_head, opq->nq_tail, entry); 462 GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
456 break; 463 break;
457 } 464 }
458} 465}
459 466
460 467
@@ -466,57 +473,57 @@ remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index)
466 * @param state the state the operation should have. It cannot be OP_STATE_INIT 473 * @param state the state the operation should have. It cannot be OP_STATE_INIT
467 */ 474 */
468static void 475static void
469change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state) 476change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
470{ 477{
471 struct QueueEntry *entry; 478 struct QueueEntry *entry;
472 struct OperationQueue *opq; 479 struct OperationQueue *opq;
473 unsigned int cnt; 480 unsigned int cnt;
474 unsigned int s; 481 unsigned int s;
475 482
476 GNUNET_assert(OP_STATE_INIT != state); 483 GNUNET_assert (OP_STATE_INIT != state);
477 GNUNET_assert(NULL != op->queues); 484 GNUNET_assert (NULL != op->queues);
478 GNUNET_assert(NULL != op->nres); 485 GNUNET_assert (NULL != op->nres);
479 GNUNET_assert((OP_STATE_INIT == op->state) || (NULL != op->qentries)); 486 GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
480 GNUNET_assert(op->state != state); 487 GNUNET_assert (op->state != state);
481 for (cnt = 0; cnt < op->nqueues; cnt++) 488 for (cnt = 0; cnt < op->nqueues; cnt++)
489 {
490 if (OP_STATE_INIT == op->state)
482 { 491 {
483 if (OP_STATE_INIT == op->state) 492 entry = GNUNET_new (struct QueueEntry);
484 { 493 entry->op = op;
485 entry = GNUNET_new(struct QueueEntry); 494 entry->nres = op->nres[cnt];
486 entry->op = op; 495 s = cnt;
487 entry->nres = op->nres[cnt]; 496 GNUNET_array_append (op->qentries, s, entry);
488 s = cnt; 497 }
489 GNUNET_array_append(op->qentries, s, entry); 498 else
490 } 499 {
491 else 500 entry = op->qentries[cnt];
492 { 501 remove_queue_entry (op, cnt);
493 entry = op->qentries[cnt]; 502 }
494 remove_queue_entry(op, cnt); 503 opq = op->queues[cnt];
495 } 504 switch (state)
496 opq = op->queues[cnt]; 505 {
497 switch (state) 506 case OP_STATE_INIT:
498 { 507 GNUNET_assert (0);
499 case OP_STATE_INIT: 508 break;
500 GNUNET_assert(0);
501 break;
502 509
503 case OP_STATE_WAITING: 510 case OP_STATE_WAITING:
504 GNUNET_CONTAINER_DLL_insert_tail(opq->wq_head, opq->wq_tail, entry); 511 GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
505 break; 512 break;
506 513
507 case OP_STATE_READY: 514 case OP_STATE_READY:
508 GNUNET_CONTAINER_DLL_insert_tail(opq->rq_head, opq->rq_tail, entry); 515 GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
509 break; 516 break;
510 517
511 case OP_STATE_ACTIVE: 518 case OP_STATE_ACTIVE:
512 GNUNET_CONTAINER_DLL_insert_tail(opq->aq_head, opq->aq_tail, entry); 519 GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
513 break; 520 break;
514 521
515 case OP_STATE_INACTIVE: 522 case OP_STATE_INACTIVE:
516 GNUNET_CONTAINER_DLL_insert_tail(opq->nq_head, opq->nq_tail, entry); 523 GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
517 break; 524 break;
518 }
519 } 525 }
526 }
520 op->state = state; 527 op->state = state;
521} 528}
522 529
@@ -528,17 +535,17 @@ change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state)
528 * @param op the operation to be removed 535 * @param op the operation to be removed
529 */ 536 */
530static void 537static void
531rq_remove(struct GNUNET_TESTBED_Operation *op) 538rq_remove (struct GNUNET_TESTBED_Operation *op)
532{ 539{
533 GNUNET_assert(NULL != op->rq_entry); 540 GNUNET_assert (NULL != op->rq_entry);
534 GNUNET_CONTAINER_DLL_remove(rq_head, rq_tail, op->rq_entry); 541 GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
535 GNUNET_free(op->rq_entry); 542 GNUNET_free (op->rq_entry);
536 op->rq_entry = NULL; 543 op->rq_entry = NULL;
537 if ((NULL == rq_head) && (NULL != process_rq_task_id)) 544 if ((NULL == rq_head) && (NULL != process_rq_task_id))
538 { 545 {
539 GNUNET_SCHEDULER_cancel(process_rq_task_id); 546 GNUNET_SCHEDULER_cancel (process_rq_task_id);
540 process_rq_task_id = NULL; 547 process_rq_task_id = NULL;
541 } 548 }
542} 549}
543 550
544 551
@@ -551,28 +558,28 @@ rq_remove(struct GNUNET_TESTBED_Operation *op)
551 * @param cls NULL 558 * @param cls NULL
552 */ 559 */
553static void 560static void
554process_rq_task(void *cls) 561process_rq_task (void *cls)
555{ 562{
556 struct GNUNET_TESTBED_Operation *op; 563 struct GNUNET_TESTBED_Operation *op;
557 struct OperationQueue *queue; 564 struct OperationQueue *queue;
558 unsigned int cnt; 565 unsigned int cnt;
559 566
560 process_rq_task_id = NULL; 567 process_rq_task_id = NULL;
561 GNUNET_assert(NULL != rq_head); 568 GNUNET_assert (NULL != rq_head);
562 GNUNET_assert(NULL != (op = rq_head->op)); 569 GNUNET_assert (NULL != (op = rq_head->op));
563 rq_remove(op); 570 rq_remove (op);
564 if (NULL != rq_head) 571 if (NULL != rq_head)
565 process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL); 572 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
566 change_state(op, OP_STATE_ACTIVE); 573 change_state (op, OP_STATE_ACTIVE);
567 for (cnt = 0; cnt < op->nqueues; cnt++) 574 for (cnt = 0; cnt < op->nqueues; cnt++)
568 { 575 {
569 queue = op->queues[cnt]; 576 queue = op->queues[cnt];
570 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) 577 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
571 assign_timeslot(op, queue); 578 assign_timeslot (op, queue);
572 } 579 }
573 op->tstart = GNUNET_TIME_absolute_get(); 580 op->tstart = GNUNET_TIME_absolute_get ();
574 if (NULL != op->start) 581 if (NULL != op->start)
575 op->start(op->cb_cls); 582 op->start (op->cb_cls);
576} 583}
577 584
578 585
@@ -582,17 +589,17 @@ process_rq_task(void *cls)
582 * @param op the operation to be queued 589 * @param op the operation to be queued
583 */ 590 */
584static void 591static void
585rq_add(struct GNUNET_TESTBED_Operation *op) 592rq_add (struct GNUNET_TESTBED_Operation *op)
586{ 593{
587 struct ReadyQueueEntry *rq_entry; 594 struct ReadyQueueEntry *rq_entry;
588 595
589 GNUNET_assert(NULL == op->rq_entry); 596 GNUNET_assert (NULL == op->rq_entry);
590 rq_entry = GNUNET_new(struct ReadyQueueEntry); 597 rq_entry = GNUNET_new (struct ReadyQueueEntry);
591 rq_entry->op = op; 598 rq_entry->op = op;
592 GNUNET_CONTAINER_DLL_insert_tail(rq_head, rq_tail, rq_entry); 599 GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
593 op->rq_entry = rq_entry; 600 op->rq_entry = rq_entry;
594 if (NULL == process_rq_task_id) 601 if (NULL == process_rq_task_id)
595 process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL); 602 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
596} 603}
597 604
598 605
@@ -604,7 +611,7 @@ rq_add(struct GNUNET_TESTBED_Operation *op)
604 * otherwise 611 * otherwise
605 */ 612 */
606static int 613static int
607is_queue_empty(struct OperationQueue *opq) 614is_queue_empty (struct OperationQueue *opq)
608{ 615{
609 if ((NULL != opq->wq_head) 616 if ((NULL != opq->wq_head)
610 || (NULL != opq->rq_head) 617 || (NULL != opq->rq_head)
@@ -632,10 +639,10 @@ is_queue_empty(struct OperationQueue *opq)
632 * be set to NULL and 0 respectively. 639 * be set to NULL and 0 respectively.
633 */ 640 */
634static int 641static int
635decide_capacity(struct OperationQueue *opq, 642decide_capacity (struct OperationQueue *opq,
636 struct QueueEntry *entry, 643 struct QueueEntry *entry,
637 struct GNUNET_TESTBED_Operation ***ops_, 644 struct GNUNET_TESTBED_Operation ***ops_,
638 unsigned int *n_ops_) 645 unsigned int *n_ops_)
639{ 646{
640 struct QueueEntry **evict_entries; 647 struct QueueEntry **evict_entries;
641 struct GNUNET_TESTBED_Operation **ops; 648 struct GNUNET_TESTBED_Operation **ops;
@@ -647,53 +654,53 @@ decide_capacity(struct OperationQueue *opq,
647 int deficit; 654 int deficit;
648 int rval; 655 int rval;
649 656
650 GNUNET_assert(NULL != (op = entry->op)); 657 GNUNET_assert (NULL != (op = entry->op));
651 GNUNET_assert(0 < (need = entry->nres)); 658 GNUNET_assert (0 < (need = entry->nres));
652 ops = NULL; 659 ops = NULL;
653 n_ops = 0; 660 n_ops = 0;
654 evict_entries = NULL; 661 evict_entries = NULL;
655 n_evict_entries = 0; 662 n_evict_entries = 0;
656 rval = GNUNET_YES; 663 rval = GNUNET_YES;
657 if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) 664 if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
658 { 665 {
659 GNUNET_assert(NULL != opq->fctx); 666 GNUNET_assert (NULL != opq->fctx);
660 GNUNET_assert(opq->max_active >= opq->overload); 667 GNUNET_assert (opq->max_active >= opq->overload);
661 max = opq->max_active - opq->overload; 668 max = opq->max_active - opq->overload;
662 } 669 }
663 else 670 else
664 max = opq->max_active; 671 max = opq->max_active;
665 if (opq->active > max) 672 if (opq->active > max)
666 { 673 {
667 rval = GNUNET_NO; 674 rval = GNUNET_NO;
668 goto ret; 675 goto ret;
669 } 676 }
670 if ((opq->active + need) <= max) 677 if ((opq->active + need) <= max)
671 goto ret; 678 goto ret;
672 deficit = need - (max - opq->active); 679 deficit = need - (max - opq->active);
673 for (entry = opq->nq_head; 680 for (entry = opq->nq_head;
674 (0 < deficit) && (NULL != entry); 681 (0 < deficit) && (NULL != entry);
675 entry = entry->next) 682 entry = entry->next)
676 { 683 {
677 GNUNET_array_append(evict_entries, n_evict_entries, entry); 684 GNUNET_array_append (evict_entries, n_evict_entries, entry);
678 deficit -= entry->nres; 685 deficit -= entry->nres;
679 } 686 }
680 if (0 < deficit) 687 if (0 < deficit)
681 { 688 {
682 rval = GNUNET_NO; 689 rval = GNUNET_NO;
683 goto ret; 690 goto ret;
684 } 691 }
685 for (n_ops = 0; n_ops < n_evict_entries;) 692 for (n_ops = 0; n_ops < n_evict_entries;)
686 { 693 {
687 op = evict_entries[n_ops]->op; 694 op = evict_entries[n_ops]->op;
688 GNUNET_array_append(ops, n_ops, op); /* increments n-ops */ 695 GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
689 } 696 }
690 697
691ret: 698ret:
692 GNUNET_free_non_null(evict_entries); 699 GNUNET_free_non_null (evict_entries);
693 if (NULL != ops_) 700 if (NULL != ops_)
694 *ops_ = ops; 701 *ops_ = ops;
695 else 702 else
696 GNUNET_free(ops); 703 GNUNET_free (ops);
697 if (NULL != n_ops_) 704 if (NULL != n_ops_)
698 *n_ops_ = n_ops; 705 *n_ops_ = n_ops;
699 return rval; 706 return rval;
@@ -710,30 +717,30 @@ ret:
710 * @param n_new the number of operations in new array 717 * @param n_new the number of operations in new array
711 */ 718 */
712static void 719static void
713merge_ops(struct GNUNET_TESTBED_Operation ***old, 720merge_ops (struct GNUNET_TESTBED_Operation ***old,
714 unsigned int *n_old, 721 unsigned int *n_old,
715 struct GNUNET_TESTBED_Operation **new, 722 struct GNUNET_TESTBED_Operation **new,
716 unsigned int n_new) 723 unsigned int n_new)
717{ 724{
718 struct GNUNET_TESTBED_Operation **cur; 725 struct GNUNET_TESTBED_Operation **cur;
719 unsigned int i; 726 unsigned int i;
720 unsigned int j; 727 unsigned int j;
721 unsigned int n_cur; 728 unsigned int n_cur;
722 729
723 GNUNET_assert(NULL != old); 730 GNUNET_assert (NULL != old);
724 n_cur = *n_old; 731 n_cur = *n_old;
725 cur = *old; 732 cur = *old;
726 for (i = 0; i < n_new; i++) 733 for (i = 0; i < n_new; i++)
734 {
735 for (j = 0; j < *n_old; j++)
727 { 736 {
728 for (j = 0; j < *n_old; j++) 737 if (new[i] == cur[j])
729 { 738 break;
730 if (new[i] == cur[j])
731 break;
732 }
733 if (j < *n_old)
734 continue;
735 GNUNET_array_append(cur, n_cur, new[j]);
736 } 739 }
740 if (j < *n_old)
741 continue;
742 GNUNET_array_append (cur, n_cur, new[j]);
743 }
737 *old = cur; 744 *old = cur;
738 *n_old = n_cur; 745 *n_old = n_cur;
739} 746}
@@ -746,7 +753,7 @@ merge_ops(struct GNUNET_TESTBED_Operation ***old,
746 * @param op the operation 753 * @param op the operation
747 */ 754 */
748static int 755static int
749check_readiness(struct GNUNET_TESTBED_Operation *op) 756check_readiness (struct GNUNET_TESTBED_Operation *op)
750{ 757{
751 struct GNUNET_TESTBED_Operation **evict_ops; 758 struct GNUNET_TESTBED_Operation **evict_ops;
752 struct GNUNET_TESTBED_Operation **ops; 759 struct GNUNET_TESTBED_Operation **ops;
@@ -754,39 +761,39 @@ check_readiness(struct GNUNET_TESTBED_Operation *op)
754 unsigned int n_evict_ops; 761 unsigned int n_evict_ops;
755 unsigned int i; 762 unsigned int i;
756 763
757 GNUNET_assert(NULL == op->rq_entry); 764 GNUNET_assert (NULL == op->rq_entry);
758 GNUNET_assert(OP_STATE_WAITING == op->state); 765 GNUNET_assert (OP_STATE_WAITING == op->state);
759 evict_ops = NULL; 766 evict_ops = NULL;
760 n_evict_ops = 0; 767 n_evict_ops = 0;
761 for (i = 0; i < op->nqueues; i++) 768 for (i = 0; i < op->nqueues; i++)
769 {
770 ops = NULL;
771 n_ops = 0;
772 if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
773 &ops, &n_ops))
762 { 774 {
763 ops = NULL; 775 GNUNET_free_non_null (evict_ops);
764 n_ops = 0; 776 return GNUNET_NO;
765 if (GNUNET_NO == decide_capacity(op->queues[i], op->qentries[i],
766 &ops, &n_ops))
767 {
768 GNUNET_free_non_null(evict_ops);
769 return GNUNET_NO;
770 }
771 if (NULL == ops)
772 continue;
773 merge_ops(&evict_ops, &n_evict_ops, ops, n_ops);
774 GNUNET_free(ops);
775 } 777 }
778 if (NULL == ops)
779 continue;
780 merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
781 GNUNET_free (ops);
782 }
776 if (NULL != evict_ops) 783 if (NULL != evict_ops)
777 { 784 {
778 for (i = 0; i < n_evict_ops; i++) 785 for (i = 0; i < n_evict_ops; i++)
779 GNUNET_TESTBED_operation_release_(evict_ops[i]); 786 GNUNET_TESTBED_operation_release_ (evict_ops[i]);
780 GNUNET_free(evict_ops); 787 GNUNET_free (evict_ops);
781 evict_ops = NULL; 788 evict_ops = NULL;
782 /* Evicting the operations should schedule this operation */ 789 /* Evicting the operations should schedule this operation */
783 GNUNET_assert(OP_STATE_READY == op->state); 790 GNUNET_assert (OP_STATE_READY == op->state);
784 return GNUNET_YES; 791 return GNUNET_YES;
785 } 792 }
786 for (i = 0; i < op->nqueues; i++) 793 for (i = 0; i < op->nqueues; i++)
787 op->queues[i]->active += op->nres[i]; 794 op->queues[i]->active += op->nres[i];
788 change_state(op, OP_STATE_READY); 795 change_state (op, OP_STATE_READY);
789 rq_add(op); 796 rq_add (op);
790 return GNUNET_YES; 797 return GNUNET_YES;
791} 798}
792 799
@@ -797,18 +804,18 @@ check_readiness(struct GNUNET_TESTBED_Operation *op)
797 * @param op the operation to defer 804 * @param op the operation to defer
798 */ 805 */
799static void 806static void
800defer(struct GNUNET_TESTBED_Operation *op) 807defer (struct GNUNET_TESTBED_Operation *op)
801{ 808{
802 unsigned int i; 809 unsigned int i;
803 810
804 GNUNET_assert(OP_STATE_READY == op->state); 811 GNUNET_assert (OP_STATE_READY == op->state);
805 rq_remove(op); 812 rq_remove (op);
806 for (i = 0; i < op->nqueues; i++) 813 for (i = 0; i < op->nqueues; i++)
807 { 814 {
808 GNUNET_assert(op->queues[i]->active >= op->nres[i]); 815 GNUNET_assert (op->queues[i]->active >= op->nres[i]);
809 op->queues[i]->active -= op->nres[i]; 816 op->queues[i]->active -= op->nres[i];
810 } 817 }
811 change_state(op, OP_STATE_WAITING); 818 change_state (op, OP_STATE_WAITING);
812} 819}
813 820
814 821
@@ -820,23 +827,23 @@ defer(struct GNUNET_TESTBED_Operation *op)
820 * @param queue the operation queue 827 * @param queue the operation queue
821 */ 828 */
822static void 829static void
823cleanup_tslots(struct OperationQueue *queue) 830cleanup_tslots (struct OperationQueue *queue)
824{ 831{
825 struct FeedbackCtx *fctx = queue->fctx; 832 struct FeedbackCtx *fctx = queue->fctx;
826 struct TimeSlot *tslot; 833 struct TimeSlot *tslot;
827 struct GNUNET_TESTBED_Operation *op; 834 struct GNUNET_TESTBED_Operation *op;
828 unsigned int cnt; 835 unsigned int cnt;
829 836
830 GNUNET_assert(NULL != fctx); 837 GNUNET_assert (NULL != fctx);
831 for (cnt = 0; cnt < queue->max_active; cnt++) 838 for (cnt = 0; cnt < queue->max_active; cnt++)
832 { 839 {
833 tslot = &fctx->tslots_freeptr[cnt]; 840 tslot = &fctx->tslots_freeptr[cnt];
834 op = tslot->op; 841 op = tslot->op;
835 if (NULL == op) 842 if (NULL == op)
836 continue; 843 continue;
837 GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot); 844 GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
838 } 845 }
839 GNUNET_free_non_null(fctx->tslots_freeptr); 846 GNUNET_free_non_null (fctx->tslots_freeptr);
840 fctx->tslots_freeptr = NULL; 847 fctx->tslots_freeptr = NULL;
841 fctx->alloc_head = NULL; 848 fctx->alloc_head = NULL;
842 fctx->alloc_tail = NULL; 849 fctx->alloc_tail = NULL;
@@ -854,23 +861,24 @@ cleanup_tslots(struct OperationQueue *queue)
854 * will be selected as n 861 * will be selected as n
855 */ 862 */
856static void 863static void
857adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n) 864adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
858{ 865{
859 struct FeedbackCtx *fctx = queue->fctx; 866 struct FeedbackCtx *fctx = queue->fctx;
860 struct TimeSlot *tslot; 867 struct TimeSlot *tslot;
861 unsigned int cnt; 868 unsigned int cnt;
862 869
863 cleanup_tslots(queue); 870 cleanup_tslots (queue);
864 n = GNUNET_MIN(n, fctx->max_active_bound); 871 n = GNUNET_MIN (n, fctx->max_active_bound);
865 fctx->tslots_freeptr = GNUNET_malloc(n * sizeof(struct TimeSlot)); 872 fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot));
866 fctx->nfailed = 0; 873 fctx->nfailed = 0;
867 for (cnt = 0; cnt < n; cnt++) 874 for (cnt = 0; cnt < n; cnt++)
868 { 875 {
869 tslot = &fctx->tslots_freeptr[cnt]; 876 tslot = &fctx->tslots_freeptr[cnt];
870 tslot->queue = queue; 877 tslot->queue = queue;
871 GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail, tslot); 878 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
872 } 879 tslot);
873 GNUNET_TESTBED_operation_queue_reset_max_active_(queue, n); 880 }
881 GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
874} 882}
875 883
876 884
@@ -881,7 +889,7 @@ adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n)
881 * @param queue the queue 889 * @param queue the queue
882 */ 890 */
883static void 891static void
884adapt_parallelism(struct OperationQueue *queue) 892adapt_parallelism (struct OperationQueue *queue)
885{ 893{
886 struct GNUNET_TIME_Relative avg; 894 struct GNUNET_TIME_Relative avg;
887 struct FeedbackCtx *fctx; 895 struct FeedbackCtx *fctx;
@@ -895,32 +903,32 @@ adapt_parallelism(struct OperationQueue *queue)
895 nvals = 0; 903 nvals = 0;
896 fctx = queue->fctx; 904 fctx = queue->fctx;
897 for (cnt = 0; cnt < queue->max_active; cnt++) 905 for (cnt = 0; cnt < queue->max_active; cnt++)
898 { 906 {
899 tslot = &fctx->tslots_freeptr[cnt]; 907 tslot = &fctx->tslots_freeptr[cnt];
900 avg = GNUNET_TIME_relative_add(avg, tslot->tsum); 908 avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
901 nvals += tslot->nvals; 909 nvals += tslot->nvals;
902 } 910 }
903 GNUNET_assert(nvals >= queue->max_active); 911 GNUNET_assert (nvals >= queue->max_active);
904 GNUNET_assert(fctx->nfailed <= nvals); 912 GNUNET_assert (fctx->nfailed <= nvals);
905 nvals -= fctx->nfailed; 913 nvals -= fctx->nfailed;
906 if (0 == nvals) 914 if (0 == nvals)
907 { 915 {
908 if (1 == queue->max_active) 916 if (1 == queue->max_active)
909 adaptive_queue_set_max_active(queue, 1); 917 adaptive_queue_set_max_active (queue, 1);
910 else 918 else
911 adaptive_queue_set_max_active(queue, queue->max_active / 2); 919 adaptive_queue_set_max_active (queue, queue->max_active / 2);
912 return; 920 return;
913 } 921 }
914 avg = GNUNET_TIME_relative_divide(avg, nvals); 922 avg = GNUNET_TIME_relative_divide (avg, nvals);
915 GNUNET_TESTBED_SD_add_data_(fctx->sd, (unsigned int)avg.rel_value_us); 923 GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
916 if (GNUNET_SYSERR == 924 if (GNUNET_SYSERR ==
917 GNUNET_TESTBED_SD_deviation_factor_(fctx->sd, 925 GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
918 (unsigned int)avg.rel_value_us, 926 (unsigned int) avg.rel_value_us,
919 &sd)) 927 &sd))
920 { 928 {
921 adaptive_queue_set_max_active(queue, queue->max_active); /* no change */ 929 adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
922 return; 930 return;
923 } 931 }
924 932
925 parallelism = 0; 933 parallelism = 0;
926 if (-1 == sd) 934 if (-1 == sd)
@@ -931,36 +939,36 @@ adapt_parallelism(struct OperationQueue *queue)
931 parallelism = queue->max_active - 1; 939 parallelism = queue->max_active - 1;
932 if (2 <= sd) 940 if (2 <= sd)
933 parallelism = queue->max_active / 2; 941 parallelism = queue->max_active / 2;
934 parallelism = GNUNET_MAX(parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); 942 parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
935 adaptive_queue_set_max_active(queue, parallelism); 943 adaptive_queue_set_max_active (queue, parallelism);
936 944
937#if 0 945#if 0
938 /* old algorithm */ 946 /* old algorithm */
939 if (sd < 0) 947 if (sd < 0)
940 sd = 0; 948 sd = 0;
941 GNUNET_assert(0 <= sd); 949 GNUNET_assert (0 <= sd);
942 //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); 950 // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
943 if (0 == sd) 951 if (0 == sd)
944 { 952 {
945 adaptive_queue_set_max_active(queue, queue->max_active * 2); 953 adaptive_queue_set_max_active (queue, queue->max_active * 2);
946 return; 954 return;
947 } 955 }
948 if (1 == sd) 956 if (1 == sd)
949 { 957 {
950 adaptive_queue_set_max_active(queue, queue->max_active + 1); 958 adaptive_queue_set_max_active (queue, queue->max_active + 1);
951 return; 959 return;
952 } 960 }
953 if (1 == queue->max_active) 961 if (1 == queue->max_active)
954 { 962 {
955 adaptive_queue_set_max_active(queue, 1); 963 adaptive_queue_set_max_active (queue, 1);
956 return; 964 return;
957 } 965 }
958 if (2 == sd) 966 if (2 == sd)
959 { 967 {
960 adaptive_queue_set_max_active(queue, queue->max_active - 1); 968 adaptive_queue_set_max_active (queue, queue->max_active - 1);
961 return; 969 return;
962 } 970 }
963 adaptive_queue_set_max_active(queue, queue->max_active / 2); 971 adaptive_queue_set_max_active (queue, queue->max_active / 2);
964#endif 972#endif
965} 973}
966 974
@@ -973,7 +981,7 @@ adapt_parallelism(struct OperationQueue *queue)
973 * @param op the operation 981 * @param op the operation
974 */ 982 */
975static void 983static void
976update_tslots(struct GNUNET_TESTBED_Operation *op) 984update_tslots (struct GNUNET_TESTBED_Operation *op)
977{ 985{
978 struct OperationQueue *queue; 986 struct OperationQueue *queue;
979 struct GNUNET_TIME_Relative t; 987 struct GNUNET_TIME_Relative t;
@@ -981,31 +989,31 @@ update_tslots(struct GNUNET_TESTBED_Operation *op)
981 struct FeedbackCtx *fctx; 989 struct FeedbackCtx *fctx;
982 unsigned int i; 990 unsigned int i;
983 991
984 t = GNUNET_TIME_absolute_get_duration(op->tstart); 992 t = GNUNET_TIME_absolute_get_duration (op->tstart);
985 while (NULL != (tslot = op->tslots_head)) /* update time slots */ 993 while (NULL != (tslot = op->tslots_head)) /* update time slots */
994 {
995 queue = tslot->queue;
996 fctx = queue->fctx;
997 GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
998 tslot->op = NULL;
999 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
1000 tslot);
1001 if (op->failed)
986 { 1002 {
987 queue = tslot->queue; 1003 fctx->nfailed++;
988 fctx = queue->fctx; 1004 for (i = 0; i < op->nqueues; i++)
989 GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot); 1005 if (queue == op->queues[i])
990 tslot->op = NULL; 1006 break;
991 GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail, 1007 GNUNET_assert (i != op->nqueues);
992 tslot); 1008 op->queues[i]->overload += op->nres[i];
993 if (op->failed)
994 {
995 fctx->nfailed++;
996 for (i = 0; i < op->nqueues; i++)
997 if (queue == op->queues[i])
998 break;
999 GNUNET_assert(i != op->nqueues);
1000 op->queues[i]->overload += op->nres[i];
1001 }
1002 tslot->tsum = GNUNET_TIME_relative_add(tslot->tsum, t);
1003 if (0 != tslot->nvals++)
1004 continue;
1005 fctx->tslots_filled++;
1006 if (queue->max_active == fctx->tslots_filled)
1007 adapt_parallelism(queue);
1008 } 1009 }
1010 tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
1011 if (0 != tslot->nvals++)
1012 continue;
1013 fctx->tslots_filled++;
1014 if (queue->max_active == fctx->tslots_filled)
1015 adapt_parallelism (queue);
1016 }
1009} 1017}
1010 1018
1011 1019
@@ -1018,12 +1026,12 @@ update_tslots(struct GNUNET_TESTBED_Operation *op)
1018 * @return handle to the operation 1026 * @return handle to the operation
1019 */ 1027 */
1020struct GNUNET_TESTBED_Operation * 1028struct GNUNET_TESTBED_Operation *
1021GNUNET_TESTBED_operation_create_(void *cls, OperationStart start, 1029GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
1022 OperationRelease release) 1030 OperationRelease release)
1023{ 1031{
1024 struct GNUNET_TESTBED_Operation *op; 1032 struct GNUNET_TESTBED_Operation *op;
1025 1033
1026 op = GNUNET_new(struct GNUNET_TESTBED_Operation); 1034 op = GNUNET_new (struct GNUNET_TESTBED_Operation);
1027 op->start = start; 1035 op->start = start;
1028 op->state = OP_STATE_INIT; 1036 op->state = OP_STATE_INIT;
1029 op->release = release; 1037 op->release = release;
@@ -1041,26 +1049,26 @@ GNUNET_TESTBED_operation_create_(void *cls, OperationStart start,
1041 * @return handle to the queue 1049 * @return handle to the queue
1042 */ 1050 */
1043struct OperationQueue * 1051struct OperationQueue *
1044GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type, 1052GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
1045 unsigned int max_active) 1053 unsigned int max_active)
1046{ 1054{
1047 struct OperationQueue *queue; 1055 struct OperationQueue *queue;
1048 struct FeedbackCtx *fctx; 1056 struct FeedbackCtx *fctx;
1049 1057
1050 queue = GNUNET_new(struct OperationQueue); 1058 queue = GNUNET_new (struct OperationQueue);
1051 queue->type = type; 1059 queue->type = type;
1052 if (OPERATION_QUEUE_TYPE_FIXED == type) 1060 if (OPERATION_QUEUE_TYPE_FIXED == type)
1053 { 1061 {
1054 queue->max_active = max_active; 1062 queue->max_active = max_active;
1055 } 1063 }
1056 else 1064 else
1057 { 1065 {
1058 fctx = GNUNET_new(struct FeedbackCtx); 1066 fctx = GNUNET_new (struct FeedbackCtx);
1059 fctx->max_active_bound = max_active; 1067 fctx->max_active_bound = max_active;
1060 fctx->sd = GNUNET_TESTBED_SD_init_(ADAPTIVE_QUEUE_DEFAULT_HISTORY); 1068 fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
1061 queue->fctx = fctx; 1069 queue->fctx = fctx;
1062 adaptive_queue_set_max_active(queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); 1070 adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
1063 } 1071 }
1064 return queue; 1072 return queue;
1065} 1073}
1066 1074
@@ -1071,18 +1079,18 @@ GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type,
1071 * @param queue the operation queue to destroy 1079 * @param queue the operation queue to destroy
1072 */ 1080 */
1073static void 1081static void
1074queue_destroy(struct OperationQueue *queue) 1082queue_destroy (struct OperationQueue *queue)
1075{ 1083{
1076 struct FeedbackCtx *fctx; 1084 struct FeedbackCtx *fctx;
1077 1085
1078 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) 1086 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
1079 { 1087 {
1080 cleanup_tslots(queue); 1088 cleanup_tslots (queue);
1081 fctx = queue->fctx; 1089 fctx = queue->fctx;
1082 GNUNET_TESTBED_SD_destroy_(fctx->sd); 1090 GNUNET_TESTBED_SD_destroy_ (fctx->sd);
1083 GNUNET_free(fctx); 1091 GNUNET_free (fctx);
1084 } 1092 }
1085 GNUNET_free(queue); 1093 GNUNET_free (queue);
1086} 1094}
1087 1095
1088 1096
@@ -1094,16 +1102,16 @@ queue_destroy(struct OperationQueue *queue)
1094 * @param queue queue to destroy 1102 * @param queue queue to destroy
1095 */ 1103 */
1096void 1104void
1097GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue) 1105GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
1098{ 1106{
1099 if (GNUNET_YES != is_queue_empty(queue)) 1107 if (GNUNET_YES != is_queue_empty (queue))
1100 { 1108 {
1101 GNUNET_assert(0 == queue->expired); /* Are you calling twice on same queue? */ 1109 GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */
1102 queue->expired = 1; 1110 queue->expired = 1;
1103 GNUNET_array_append(expired_opqs, n_expired_opqs, queue); 1111 GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
1104 return; 1112 return;
1105 } 1113 }
1106 queue_destroy(queue); 1114 queue_destroy (queue);
1107} 1115}
1108 1116
1109 1117
@@ -1115,11 +1123,11 @@ GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue)
1115 * is not empty) 1123 * is not empty)
1116 */ 1124 */
1117int 1125int
1118GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue) 1126GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
1119{ 1127{
1120 if (GNUNET_NO == is_queue_empty(queue)) 1128 if (GNUNET_NO == is_queue_empty (queue))
1121 return GNUNET_NO; 1129 return GNUNET_NO;
1122 GNUNET_TESTBED_operation_queue_destroy_(queue); 1130 GNUNET_TESTBED_operation_queue_destroy_ (queue);
1123 return GNUNET_YES; 1131 return GNUNET_YES;
1124} 1132}
1125 1133
@@ -1131,19 +1139,19 @@ GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue)
1131 * @param opq the operation queue 1139 * @param opq the operation queue
1132 */ 1140 */
1133static void 1141static void
1134recheck_waiting(struct OperationQueue *opq) 1142recheck_waiting (struct OperationQueue *opq)
1135{ 1143{
1136 struct QueueEntry *entry; 1144 struct QueueEntry *entry;
1137 struct QueueEntry *entry2; 1145 struct QueueEntry *entry2;
1138 1146
1139 entry = opq->wq_head; 1147 entry = opq->wq_head;
1140 while (NULL != entry) 1148 while (NULL != entry)
1141 { 1149 {
1142 entry2 = entry->next; 1150 entry2 = entry->next;
1143 if (GNUNET_NO == check_readiness(entry->op)) 1151 if (GNUNET_NO == check_readiness (entry->op))
1144 break; 1152 break;
1145 entry = entry2; 1153 entry = entry2;
1146 } 1154 }
1147} 1155}
1148 1156
1149 1157
@@ -1156,8 +1164,8 @@ recheck_waiting(struct OperationQueue *opq)
1156 * @param max_active the new maximum number of active operations 1164 * @param max_active the new maximum number of active operations
1157 */ 1165 */
1158void 1166void
1159GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, 1167GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
1160 unsigned int max_active) 1168 unsigned int max_active)
1161{ 1169{
1162 struct QueueEntry *entry; 1170 struct QueueEntry *entry;
1163 1171
@@ -1165,8 +1173,8 @@ GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue,
1165 queue->overload = 0; 1173 queue->overload = 0;
1166 while ((queue->active > queue->max_active) 1174 while ((queue->active > queue->max_active)
1167 && (NULL != (entry = queue->rq_head))) 1175 && (NULL != (entry = queue->rq_head)))
1168 defer(entry->op); 1176 defer (entry->op);
1169 recheck_waiting(queue); 1177 recheck_waiting (queue);
1170} 1178}
1171 1179
1172 1180
@@ -1182,17 +1190,17 @@ GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue,
1182 * operation. Should be greater than 0. 1190 * operation. Should be greater than 0.
1183 */ 1191 */
1184void 1192void
1185GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue, 1193GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
1186 struct GNUNET_TESTBED_Operation *op, 1194 struct GNUNET_TESTBED_Operation *op,
1187 unsigned int nres) 1195 unsigned int nres)
1188{ 1196{
1189 unsigned int qsize; 1197 unsigned int qsize;
1190 1198
1191 GNUNET_assert(0 < nres); 1199 GNUNET_assert (0 < nres);
1192 qsize = op->nqueues; 1200 qsize = op->nqueues;
1193 GNUNET_array_append(op->queues, op->nqueues, queue); 1201 GNUNET_array_append (op->queues, op->nqueues, queue);
1194 GNUNET_array_append(op->nres, qsize, nres); 1202 GNUNET_array_append (op->nres, qsize, nres);
1195 GNUNET_assert(qsize == op->nqueues); 1203 GNUNET_assert (qsize == op->nqueues);
1196} 1204}
1197 1205
1198 1206
@@ -1208,10 +1216,10 @@ GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue,
1208 * @param op operation to add to the queue 1216 * @param op operation to add to the queue
1209 */ 1217 */
1210void 1218void
1211GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue, 1219GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
1212 struct GNUNET_TESTBED_Operation *op) 1220 struct GNUNET_TESTBED_Operation *op)
1213{ 1221{
1214 return GNUNET_TESTBED_operation_queue_insert2_(queue, op, 1); 1222 return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
1215} 1223}
1216 1224
1217 1225
@@ -1225,11 +1233,11 @@ GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue,
1225 * @param op the operation to marks as waiting 1233 * @param op the operation to marks as waiting
1226 */ 1234 */
1227void 1235void
1228GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op) 1236GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
1229{ 1237{
1230 GNUNET_assert(NULL == op->rq_entry); 1238 GNUNET_assert (NULL == op->rq_entry);
1231 change_state(op, OP_STATE_WAITING); 1239 change_state (op, OP_STATE_WAITING);
1232 (void)check_readiness(op); 1240 (void) check_readiness (op);
1233} 1241}
1234 1242
1235 1243
@@ -1243,24 +1251,24 @@ GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op)
1243 * it as inactive. 1251 * it as inactive.
1244 */ 1252 */
1245void 1253void
1246GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op) 1254GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
1247{ 1255{
1248 struct OperationQueue **queues; 1256 struct OperationQueue **queues;
1249 size_t ms; 1257 size_t ms;
1250 unsigned int nqueues; 1258 unsigned int nqueues;
1251 unsigned int i; 1259 unsigned int i;
1252 1260
1253 GNUNET_assert(OP_STATE_ACTIVE == op->state); 1261 GNUNET_assert (OP_STATE_ACTIVE == op->state);
1254 change_state(op, OP_STATE_INACTIVE); 1262 change_state (op, OP_STATE_INACTIVE);
1255 nqueues = op->nqueues; 1263 nqueues = op->nqueues;
1256 ms = sizeof(struct OperationQueue *) * nqueues; 1264 ms = sizeof(struct OperationQueue *) * nqueues;
1257 queues = GNUNET_malloc(ms); 1265 queues = GNUNET_malloc (ms);
1258 /* Cloning is needed as the operation be released by waiting operations and 1266 /* Cloning is needed as the operation be released by waiting operations and
1259 hence its nqueues memory ptr will be freed */ 1267 hence its nqueues memory ptr will be freed */
1260 GNUNET_memcpy(queues, op->queues, ms); 1268 GNUNET_memcpy (queues, op->queues, ms);
1261 for (i = 0; i < nqueues; i++) 1269 for (i = 0; i < nqueues; i++)
1262 recheck_waiting(queues[i]); 1270 recheck_waiting (queues[i]);
1263 GNUNET_free(queues); 1271 GNUNET_free (queues);
1264} 1272}
1265 1273
1266 1274
@@ -1272,10 +1280,10 @@ GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op)
1272 * @param op the operation to be marked as active 1280 * @param op the operation to be marked as active
1273 */ 1281 */
1274void 1282void
1275GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op) 1283GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
1276{ 1284{
1277 GNUNET_assert(OP_STATE_INACTIVE == op->state); 1285 GNUNET_assert (OP_STATE_INACTIVE == op->state);
1278 change_state(op, OP_STATE_ACTIVE); 1286 change_state (op, OP_STATE_ACTIVE);
1279} 1287}
1280 1288
1281 1289
@@ -1286,56 +1294,56 @@ GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op)
1286 * @param op operation that finished 1294 * @param op operation that finished
1287 */ 1295 */
1288void 1296void
1289GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op) 1297GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
1290{ 1298{
1291 struct QueueEntry *entry; 1299 struct QueueEntry *entry;
1292 struct OperationQueue *opq; 1300 struct OperationQueue *opq;
1293 unsigned int i; 1301 unsigned int i;
1294 1302
1295 if (OP_STATE_INIT == op->state) 1303 if (OP_STATE_INIT == op->state)
1296 { 1304 {
1297 GNUNET_free(op); 1305 GNUNET_free (op);
1298 return; 1306 return;
1299 } 1307 }
1300 if (OP_STATE_READY == op->state) 1308 if (OP_STATE_READY == op->state)
1301 rq_remove(op); 1309 rq_remove (op);
1302 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ 1310 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
1303 GNUNET_TESTBED_operation_activate_(op); 1311 GNUNET_TESTBED_operation_activate_ (op);
1304 if (OP_STATE_ACTIVE == op->state) 1312 if (OP_STATE_ACTIVE == op->state)
1305 update_tslots(op); 1313 update_tslots (op);
1306 GNUNET_assert(NULL != op->queues); 1314 GNUNET_assert (NULL != op->queues);
1307 GNUNET_assert(NULL != op->qentries); 1315 GNUNET_assert (NULL != op->qentries);
1308 for (i = 0; i < op->nqueues; i++) 1316 for (i = 0; i < op->nqueues; i++)
1317 {
1318 entry = op->qentries[i];
1319 remove_queue_entry (op, i);
1320 opq = op->queues[i];
1321 switch (op->state)
1309 { 1322 {
1310 entry = op->qentries[i]; 1323 case OP_STATE_INIT:
1311 remove_queue_entry(op, i); 1324 case OP_STATE_INACTIVE:
1312 opq = op->queues[i]; 1325 GNUNET_assert (0);
1313 switch (op->state) 1326 break;
1314 {
1315 case OP_STATE_INIT:
1316 case OP_STATE_INACTIVE:
1317 GNUNET_assert(0);
1318 break;
1319 1327
1320 case OP_STATE_WAITING: 1328 case OP_STATE_WAITING:
1321 break; 1329 break;
1322 1330
1323 case OP_STATE_ACTIVE: 1331 case OP_STATE_ACTIVE:
1324 case OP_STATE_READY: 1332 case OP_STATE_READY:
1325 GNUNET_assert(0 != opq->active); 1333 GNUNET_assert (0 != opq->active);
1326 GNUNET_assert(opq->active >= entry->nres); 1334 GNUNET_assert (opq->active >= entry->nres);
1327 opq->active -= entry->nres; 1335 opq->active -= entry->nres;
1328 recheck_waiting(opq); 1336 recheck_waiting (opq);
1329 break; 1337 break;
1330 }
1331 GNUNET_free(entry);
1332 } 1338 }
1333 GNUNET_free_non_null(op->qentries); 1339 GNUNET_free (entry);
1334 GNUNET_free(op->queues); 1340 }
1335 GNUNET_free(op->nres); 1341 GNUNET_free_non_null (op->qentries);
1342 GNUNET_free (op->queues);
1343 GNUNET_free (op->nres);
1336 if (NULL != op->release) 1344 if (NULL != op->release)
1337 op->release(op->cb_cls); 1345 op->release (op->cb_cls);
1338 GNUNET_free(op); 1346 GNUNET_free (op);
1339} 1347}
1340 1348
1341 1349
@@ -1345,7 +1353,7 @@ GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op)
1345 * @param op the operation to be marked as failed 1353 * @param op the operation to be marked as failed
1346 */ 1354 */
1347void 1355void
1348GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op) 1356GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
1349{ 1357{
1350 op->failed = GNUNET_YES; 1358 op->failed = GNUNET_YES;
1351} 1359}
@@ -1356,23 +1364,23 @@ GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op)
1356 * operations which are not completed and warn about them. 1364 * operations which are not completed and warn about them.
1357 */ 1365 */
1358void __attribute__ ((destructor)) 1366void __attribute__ ((destructor))
1359GNUNET_TESTBED_operations_fini() 1367GNUNET_TESTBED_operations_fini ()
1360{ 1368{
1361 struct OperationQueue *queue; 1369 struct OperationQueue *queue;
1362 unsigned int i; 1370 unsigned int i;
1363 int warn = 0; 1371 int warn = 0;
1364 1372
1365 for (i = 0; i < n_expired_opqs; i++) 1373 for (i = 0; i < n_expired_opqs; i++)
1366 { 1374 {
1367 queue = expired_opqs[i]; 1375 queue = expired_opqs[i];
1368 if (GNUNET_NO == is_queue_empty(queue)) 1376 if (GNUNET_NO == is_queue_empty (queue))
1369 warn = 1; 1377 warn = 1;
1370 queue_destroy(queue); 1378 queue_destroy (queue);
1371 } 1379 }
1372 GNUNET_free_non_null(expired_opqs); 1380 GNUNET_free_non_null (expired_opqs);
1373 n_expired_opqs = 0; 1381 n_expired_opqs = 0;
1374 if (warn) 1382 if (warn)
1375 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, 1383 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1376 "Be disciplined. Some operations were not marked as done.\n"); 1384 "Be disciplined. Some operations were not marked as done.\n");
1377} 1385}
1378/* end of testbed_api_operations.c */ 1386/* end of testbed_api_operations.c */