diff options
Diffstat (limited to 'src/testbed/testbed_api_operations.c')
-rw-r--r-- | src/testbed/testbed_api_operations.c | 770 |
1 files changed, 389 insertions, 381 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index 194dc2655..185f62aa8 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c | |||
@@ -44,7 +44,8 @@ | |||
44 | /** | 44 | /** |
45 | * An entry in the operation queue | 45 | * An entry in the operation queue |
46 | */ | 46 | */ |
47 | struct QueueEntry { | 47 | struct QueueEntry |
48 | { | ||
48 | /** | 49 | /** |
49 | * The next DLL pointer | 50 | * The next DLL pointer |
50 | */ | 51 | */ |
@@ -77,7 +78,8 @@ struct OperationQueue; | |||
77 | /** | 78 | /** |
78 | * A slot to record time taken by an operation | 79 | * A slot to record time taken by an operation |
79 | */ | 80 | */ |
80 | struct TimeSlot { | 81 | struct TimeSlot |
82 | { | ||
81 | /** | 83 | /** |
82 | * DLL next pointer | 84 | * DLL next pointer |
83 | */ | 85 | */ |
@@ -113,7 +115,8 @@ struct TimeSlot { | |||
113 | /** | 115 | /** |
114 | * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE | 116 | * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE |
115 | */ | 117 | */ |
116 | struct FeedbackCtx { | 118 | struct FeedbackCtx |
119 | { | ||
117 | /** | 120 | /** |
118 | * Handle for calculating standard deviation | 121 | * Handle for calculating standard deviation |
119 | */ | 122 | */ |
@@ -156,7 +159,8 @@ struct FeedbackCtx { | |||
156 | * Queue of operations where we can only support a certain | 159 | * Queue of operations where we can only support a certain |
157 | * number of concurrent operations of a particular type. | 160 | * number of concurrent operations of a particular type. |
158 | */ | 161 | */ |
159 | struct OperationQueue { | 162 | struct OperationQueue |
163 | { | ||
160 | /** | 164 | /** |
161 | * DLL head for the wait queue. Operations which are waiting for this | 165 | * DLL head for the wait queue. Operations which are waiting for this |
162 | * operation queue are put here | 166 | * operation queue are put here |
@@ -243,7 +247,8 @@ struct OperationQueue { | |||
243 | /** | 247 | /** |
244 | * Operation state | 248 | * Operation state |
245 | */ | 249 | */ |
246 | enum OperationState { | 250 | enum OperationState |
251 | { | ||
247 | /** | 252 | /** |
248 | * The operation is just created and is in initial state | 253 | * The operation is just created and is in initial state |
249 | */ | 254 | */ |
@@ -277,7 +282,8 @@ enum OperationState { | |||
277 | /** | 282 | /** |
278 | * An entry in the ready queue (implemented as DLL) | 283 | * An entry in the ready queue (implemented as DLL) |
279 | */ | 284 | */ |
280 | struct ReadyQueueEntry { | 285 | struct ReadyQueueEntry |
286 | { | ||
281 | /** | 287 | /** |
282 | * next ptr for DLL | 288 | * next ptr for DLL |
283 | */ | 289 | */ |
@@ -298,7 +304,8 @@ struct ReadyQueueEntry { | |||
298 | /** | 304 | /** |
299 | * Opaque handle to an abstract operation to be executed by the testing framework. | 305 | * Opaque handle to an abstract operation to be executed by the testing framework. |
300 | */ | 306 | */ |
301 | struct GNUNET_TESTBED_Operation { | 307 | struct GNUNET_TESTBED_Operation |
308 | { | ||
302 | /** | 309 | /** |
303 | * Function to call when we have the resources to begin the operation. | 310 | * Function to call when we have the resources to begin the operation. |
304 | */ | 311 | */ |
@@ -403,17 +410,17 @@ struct GNUNET_SCHEDULER_Task *process_rq_task_id; | |||
403 | * @return the timeslot | 410 | * @return the timeslot |
404 | */ | 411 | */ |
405 | static void | 412 | static void |
406 | assign_timeslot(struct GNUNET_TESTBED_Operation *op, | 413 | assign_timeslot (struct GNUNET_TESTBED_Operation *op, |
407 | struct OperationQueue *queue) | 414 | struct OperationQueue *queue) |
408 | { | 415 | { |
409 | struct FeedbackCtx *fctx = queue->fctx; | 416 | struct FeedbackCtx *fctx = queue->fctx; |
410 | struct TimeSlot *tslot; | 417 | struct TimeSlot *tslot; |
411 | 418 | ||
412 | GNUNET_assert(OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type); | 419 | GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type); |
413 | tslot = fctx->alloc_head; | 420 | tslot = fctx->alloc_head; |
414 | GNUNET_assert(NULL != tslot); | 421 | GNUNET_assert (NULL != tslot); |
415 | GNUNET_CONTAINER_DLL_remove(fctx->alloc_head, fctx->alloc_tail, tslot); | 422 | GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot); |
416 | GNUNET_CONTAINER_DLL_insert_tail(op->tslots_head, op->tslots_tail, tslot); | 423 | GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot); |
417 | tslot->op = op; | 424 | tslot->op = op; |
418 | } | 425 | } |
419 | 426 | ||
@@ -426,7 +433,7 @@ assign_timeslot(struct GNUNET_TESTBED_Operation *op, | |||
426 | * @param index the index of the entry in the operation's array of queue entries | 433 | * @param index the index of the entry in the operation's array of queue entries |
427 | */ | 434 | */ |
428 | static void | 435 | static void |
429 | remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index) | 436 | remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) |
430 | { | 437 | { |
431 | struct OperationQueue *opq; | 438 | struct OperationQueue *opq; |
432 | struct QueueEntry *entry; | 439 | struct QueueEntry *entry; |
@@ -434,27 +441,27 @@ remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index) | |||
434 | opq = op->queues[index]; | 441 | opq = op->queues[index]; |
435 | entry = op->qentries[index]; | 442 | entry = op->qentries[index]; |
436 | switch (op->state) | 443 | switch (op->state) |
437 | { | 444 | { |
438 | case OP_STATE_INIT: | 445 | case OP_STATE_INIT: |
439 | GNUNET_assert(0); | 446 | GNUNET_assert (0); |
440 | break; | 447 | break; |
441 | 448 | ||
442 | case OP_STATE_WAITING: | 449 | case OP_STATE_WAITING: |
443 | GNUNET_CONTAINER_DLL_remove(opq->wq_head, opq->wq_tail, entry); | 450 | GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); |
444 | break; | 451 | break; |
445 | 452 | ||
446 | case OP_STATE_READY: | 453 | case OP_STATE_READY: |
447 | GNUNET_CONTAINER_DLL_remove(opq->rq_head, opq->rq_tail, entry); | 454 | GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry); |
448 | break; | 455 | break; |
449 | 456 | ||
450 | case OP_STATE_ACTIVE: | 457 | case OP_STATE_ACTIVE: |
451 | GNUNET_CONTAINER_DLL_remove(opq->aq_head, opq->aq_tail, entry); | 458 | GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); |
452 | break; | 459 | break; |
453 | 460 | ||
454 | case OP_STATE_INACTIVE: | 461 | case OP_STATE_INACTIVE: |
455 | GNUNET_CONTAINER_DLL_remove(opq->nq_head, opq->nq_tail, entry); | 462 | GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry); |
456 | break; | 463 | break; |
457 | } | 464 | } |
458 | } | 465 | } |
459 | 466 | ||
460 | 467 | ||
@@ -466,57 +473,57 @@ remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index) | |||
466 | * @param state the state the operation should have. It cannot be OP_STATE_INIT | 473 | * @param state the state the operation should have. It cannot be OP_STATE_INIT |
467 | */ | 474 | */ |
468 | static void | 475 | static void |
469 | change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state) | 476 | change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) |
470 | { | 477 | { |
471 | struct QueueEntry *entry; | 478 | struct QueueEntry *entry; |
472 | struct OperationQueue *opq; | 479 | struct OperationQueue *opq; |
473 | unsigned int cnt; | 480 | unsigned int cnt; |
474 | unsigned int s; | 481 | unsigned int s; |
475 | 482 | ||
476 | GNUNET_assert(OP_STATE_INIT != state); | 483 | GNUNET_assert (OP_STATE_INIT != state); |
477 | GNUNET_assert(NULL != op->queues); | 484 | GNUNET_assert (NULL != op->queues); |
478 | GNUNET_assert(NULL != op->nres); | 485 | GNUNET_assert (NULL != op->nres); |
479 | GNUNET_assert((OP_STATE_INIT == op->state) || (NULL != op->qentries)); | 486 | GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries)); |
480 | GNUNET_assert(op->state != state); | 487 | GNUNET_assert (op->state != state); |
481 | for (cnt = 0; cnt < op->nqueues; cnt++) | 488 | for (cnt = 0; cnt < op->nqueues; cnt++) |
489 | { | ||
490 | if (OP_STATE_INIT == op->state) | ||
482 | { | 491 | { |
483 | if (OP_STATE_INIT == op->state) | 492 | entry = GNUNET_new (struct QueueEntry); |
484 | { | 493 | entry->op = op; |
485 | entry = GNUNET_new(struct QueueEntry); | 494 | entry->nres = op->nres[cnt]; |
486 | entry->op = op; | 495 | s = cnt; |
487 | entry->nres = op->nres[cnt]; | 496 | GNUNET_array_append (op->qentries, s, entry); |
488 | s = cnt; | 497 | } |
489 | GNUNET_array_append(op->qentries, s, entry); | 498 | else |
490 | } | 499 | { |
491 | else | 500 | entry = op->qentries[cnt]; |
492 | { | 501 | remove_queue_entry (op, cnt); |
493 | entry = op->qentries[cnt]; | 502 | } |
494 | remove_queue_entry(op, cnt); | 503 | opq = op->queues[cnt]; |
495 | } | 504 | switch (state) |
496 | opq = op->queues[cnt]; | 505 | { |
497 | switch (state) | 506 | case OP_STATE_INIT: |
498 | { | 507 | GNUNET_assert (0); |
499 | case OP_STATE_INIT: | 508 | break; |
500 | GNUNET_assert(0); | ||
501 | break; | ||
502 | 509 | ||
503 | case OP_STATE_WAITING: | 510 | case OP_STATE_WAITING: |
504 | GNUNET_CONTAINER_DLL_insert_tail(opq->wq_head, opq->wq_tail, entry); | 511 | GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); |
505 | break; | 512 | break; |
506 | 513 | ||
507 | case OP_STATE_READY: | 514 | case OP_STATE_READY: |
508 | GNUNET_CONTAINER_DLL_insert_tail(opq->rq_head, opq->rq_tail, entry); | 515 | GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry); |
509 | break; | 516 | break; |
510 | 517 | ||
511 | case OP_STATE_ACTIVE: | 518 | case OP_STATE_ACTIVE: |
512 | GNUNET_CONTAINER_DLL_insert_tail(opq->aq_head, opq->aq_tail, entry); | 519 | GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); |
513 | break; | 520 | break; |
514 | 521 | ||
515 | case OP_STATE_INACTIVE: | 522 | case OP_STATE_INACTIVE: |
516 | GNUNET_CONTAINER_DLL_insert_tail(opq->nq_head, opq->nq_tail, entry); | 523 | GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry); |
517 | break; | 524 | break; |
518 | } | ||
519 | } | 525 | } |
526 | } | ||
520 | op->state = state; | 527 | op->state = state; |
521 | } | 528 | } |
522 | 529 | ||
@@ -528,17 +535,17 @@ change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state) | |||
528 | * @param op the operation to be removed | 535 | * @param op the operation to be removed |
529 | */ | 536 | */ |
530 | static void | 537 | static void |
531 | rq_remove(struct GNUNET_TESTBED_Operation *op) | 538 | rq_remove (struct GNUNET_TESTBED_Operation *op) |
532 | { | 539 | { |
533 | GNUNET_assert(NULL != op->rq_entry); | 540 | GNUNET_assert (NULL != op->rq_entry); |
534 | GNUNET_CONTAINER_DLL_remove(rq_head, rq_tail, op->rq_entry); | 541 | GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry); |
535 | GNUNET_free(op->rq_entry); | 542 | GNUNET_free (op->rq_entry); |
536 | op->rq_entry = NULL; | 543 | op->rq_entry = NULL; |
537 | if ((NULL == rq_head) && (NULL != process_rq_task_id)) | 544 | if ((NULL == rq_head) && (NULL != process_rq_task_id)) |
538 | { | 545 | { |
539 | GNUNET_SCHEDULER_cancel(process_rq_task_id); | 546 | GNUNET_SCHEDULER_cancel (process_rq_task_id); |
540 | process_rq_task_id = NULL; | 547 | process_rq_task_id = NULL; |
541 | } | 548 | } |
542 | } | 549 | } |
543 | 550 | ||
544 | 551 | ||
@@ -551,28 +558,28 @@ rq_remove(struct GNUNET_TESTBED_Operation *op) | |||
551 | * @param cls NULL | 558 | * @param cls NULL |
552 | */ | 559 | */ |
553 | static void | 560 | static void |
554 | process_rq_task(void *cls) | 561 | process_rq_task (void *cls) |
555 | { | 562 | { |
556 | struct GNUNET_TESTBED_Operation *op; | 563 | struct GNUNET_TESTBED_Operation *op; |
557 | struct OperationQueue *queue; | 564 | struct OperationQueue *queue; |
558 | unsigned int cnt; | 565 | unsigned int cnt; |
559 | 566 | ||
560 | process_rq_task_id = NULL; | 567 | process_rq_task_id = NULL; |
561 | GNUNET_assert(NULL != rq_head); | 568 | GNUNET_assert (NULL != rq_head); |
562 | GNUNET_assert(NULL != (op = rq_head->op)); | 569 | GNUNET_assert (NULL != (op = rq_head->op)); |
563 | rq_remove(op); | 570 | rq_remove (op); |
564 | if (NULL != rq_head) | 571 | if (NULL != rq_head) |
565 | process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL); | 572 | process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); |
566 | change_state(op, OP_STATE_ACTIVE); | 573 | change_state (op, OP_STATE_ACTIVE); |
567 | for (cnt = 0; cnt < op->nqueues; cnt++) | 574 | for (cnt = 0; cnt < op->nqueues; cnt++) |
568 | { | 575 | { |
569 | queue = op->queues[cnt]; | 576 | queue = op->queues[cnt]; |
570 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) | 577 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) |
571 | assign_timeslot(op, queue); | 578 | assign_timeslot (op, queue); |
572 | } | 579 | } |
573 | op->tstart = GNUNET_TIME_absolute_get(); | 580 | op->tstart = GNUNET_TIME_absolute_get (); |
574 | if (NULL != op->start) | 581 | if (NULL != op->start) |
575 | op->start(op->cb_cls); | 582 | op->start (op->cb_cls); |
576 | } | 583 | } |
577 | 584 | ||
578 | 585 | ||
@@ -582,17 +589,17 @@ process_rq_task(void *cls) | |||
582 | * @param op the operation to be queued | 589 | * @param op the operation to be queued |
583 | */ | 590 | */ |
584 | static void | 591 | static void |
585 | rq_add(struct GNUNET_TESTBED_Operation *op) | 592 | rq_add (struct GNUNET_TESTBED_Operation *op) |
586 | { | 593 | { |
587 | struct ReadyQueueEntry *rq_entry; | 594 | struct ReadyQueueEntry *rq_entry; |
588 | 595 | ||
589 | GNUNET_assert(NULL == op->rq_entry); | 596 | GNUNET_assert (NULL == op->rq_entry); |
590 | rq_entry = GNUNET_new(struct ReadyQueueEntry); | 597 | rq_entry = GNUNET_new (struct ReadyQueueEntry); |
591 | rq_entry->op = op; | 598 | rq_entry->op = op; |
592 | GNUNET_CONTAINER_DLL_insert_tail(rq_head, rq_tail, rq_entry); | 599 | GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry); |
593 | op->rq_entry = rq_entry; | 600 | op->rq_entry = rq_entry; |
594 | if (NULL == process_rq_task_id) | 601 | if (NULL == process_rq_task_id) |
595 | process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL); | 602 | process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); |
596 | } | 603 | } |
597 | 604 | ||
598 | 605 | ||
@@ -604,7 +611,7 @@ rq_add(struct GNUNET_TESTBED_Operation *op) | |||
604 | * otherwise | 611 | * otherwise |
605 | */ | 612 | */ |
606 | static int | 613 | static int |
607 | is_queue_empty(struct OperationQueue *opq) | 614 | is_queue_empty (struct OperationQueue *opq) |
608 | { | 615 | { |
609 | if ((NULL != opq->wq_head) | 616 | if ((NULL != opq->wq_head) |
610 | || (NULL != opq->rq_head) | 617 | || (NULL != opq->rq_head) |
@@ -632,10 +639,10 @@ is_queue_empty(struct OperationQueue *opq) | |||
632 | * be set to NULL and 0 respectively. | 639 | * be set to NULL and 0 respectively. |
633 | */ | 640 | */ |
634 | static int | 641 | static int |
635 | decide_capacity(struct OperationQueue *opq, | 642 | decide_capacity (struct OperationQueue *opq, |
636 | struct QueueEntry *entry, | 643 | struct QueueEntry *entry, |
637 | struct GNUNET_TESTBED_Operation ***ops_, | 644 | struct GNUNET_TESTBED_Operation ***ops_, |
638 | unsigned int *n_ops_) | 645 | unsigned int *n_ops_) |
639 | { | 646 | { |
640 | struct QueueEntry **evict_entries; | 647 | struct QueueEntry **evict_entries; |
641 | struct GNUNET_TESTBED_Operation **ops; | 648 | struct GNUNET_TESTBED_Operation **ops; |
@@ -647,53 +654,53 @@ decide_capacity(struct OperationQueue *opq, | |||
647 | int deficit; | 654 | int deficit; |
648 | int rval; | 655 | int rval; |
649 | 656 | ||
650 | GNUNET_assert(NULL != (op = entry->op)); | 657 | GNUNET_assert (NULL != (op = entry->op)); |
651 | GNUNET_assert(0 < (need = entry->nres)); | 658 | GNUNET_assert (0 < (need = entry->nres)); |
652 | ops = NULL; | 659 | ops = NULL; |
653 | n_ops = 0; | 660 | n_ops = 0; |
654 | evict_entries = NULL; | 661 | evict_entries = NULL; |
655 | n_evict_entries = 0; | 662 | n_evict_entries = 0; |
656 | rval = GNUNET_YES; | 663 | rval = GNUNET_YES; |
657 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) | 664 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) |
658 | { | 665 | { |
659 | GNUNET_assert(NULL != opq->fctx); | 666 | GNUNET_assert (NULL != opq->fctx); |
660 | GNUNET_assert(opq->max_active >= opq->overload); | 667 | GNUNET_assert (opq->max_active >= opq->overload); |
661 | max = opq->max_active - opq->overload; | 668 | max = opq->max_active - opq->overload; |
662 | } | 669 | } |
663 | else | 670 | else |
664 | max = opq->max_active; | 671 | max = opq->max_active; |
665 | if (opq->active > max) | 672 | if (opq->active > max) |
666 | { | 673 | { |
667 | rval = GNUNET_NO; | 674 | rval = GNUNET_NO; |
668 | goto ret; | 675 | goto ret; |
669 | } | 676 | } |
670 | if ((opq->active + need) <= max) | 677 | if ((opq->active + need) <= max) |
671 | goto ret; | 678 | goto ret; |
672 | deficit = need - (max - opq->active); | 679 | deficit = need - (max - opq->active); |
673 | for (entry = opq->nq_head; | 680 | for (entry = opq->nq_head; |
674 | (0 < deficit) && (NULL != entry); | 681 | (0 < deficit) && (NULL != entry); |
675 | entry = entry->next) | 682 | entry = entry->next) |
676 | { | 683 | { |
677 | GNUNET_array_append(evict_entries, n_evict_entries, entry); | 684 | GNUNET_array_append (evict_entries, n_evict_entries, entry); |
678 | deficit -= entry->nres; | 685 | deficit -= entry->nres; |
679 | } | 686 | } |
680 | if (0 < deficit) | 687 | if (0 < deficit) |
681 | { | 688 | { |
682 | rval = GNUNET_NO; | 689 | rval = GNUNET_NO; |
683 | goto ret; | 690 | goto ret; |
684 | } | 691 | } |
685 | for (n_ops = 0; n_ops < n_evict_entries;) | 692 | for (n_ops = 0; n_ops < n_evict_entries;) |
686 | { | 693 | { |
687 | op = evict_entries[n_ops]->op; | 694 | op = evict_entries[n_ops]->op; |
688 | GNUNET_array_append(ops, n_ops, op); /* increments n-ops */ | 695 | GNUNET_array_append (ops, n_ops, op); /* increments n-ops */ |
689 | } | 696 | } |
690 | 697 | ||
691 | ret: | 698 | ret: |
692 | GNUNET_free_non_null(evict_entries); | 699 | GNUNET_free_non_null (evict_entries); |
693 | if (NULL != ops_) | 700 | if (NULL != ops_) |
694 | *ops_ = ops; | 701 | *ops_ = ops; |
695 | else | 702 | else |
696 | GNUNET_free(ops); | 703 | GNUNET_free (ops); |
697 | if (NULL != n_ops_) | 704 | if (NULL != n_ops_) |
698 | *n_ops_ = n_ops; | 705 | *n_ops_ = n_ops; |
699 | return rval; | 706 | return rval; |
@@ -710,30 +717,30 @@ ret: | |||
710 | * @param n_new the number of operations in new array | 717 | * @param n_new the number of operations in new array |
711 | */ | 718 | */ |
712 | static void | 719 | static void |
713 | merge_ops(struct GNUNET_TESTBED_Operation ***old, | 720 | merge_ops (struct GNUNET_TESTBED_Operation ***old, |
714 | unsigned int *n_old, | 721 | unsigned int *n_old, |
715 | struct GNUNET_TESTBED_Operation **new, | 722 | struct GNUNET_TESTBED_Operation **new, |
716 | unsigned int n_new) | 723 | unsigned int n_new) |
717 | { | 724 | { |
718 | struct GNUNET_TESTBED_Operation **cur; | 725 | struct GNUNET_TESTBED_Operation **cur; |
719 | unsigned int i; | 726 | unsigned int i; |
720 | unsigned int j; | 727 | unsigned int j; |
721 | unsigned int n_cur; | 728 | unsigned int n_cur; |
722 | 729 | ||
723 | GNUNET_assert(NULL != old); | 730 | GNUNET_assert (NULL != old); |
724 | n_cur = *n_old; | 731 | n_cur = *n_old; |
725 | cur = *old; | 732 | cur = *old; |
726 | for (i = 0; i < n_new; i++) | 733 | for (i = 0; i < n_new; i++) |
734 | { | ||
735 | for (j = 0; j < *n_old; j++) | ||
727 | { | 736 | { |
728 | for (j = 0; j < *n_old; j++) | 737 | if (new[i] == cur[j]) |
729 | { | 738 | break; |
730 | if (new[i] == cur[j]) | ||
731 | break; | ||
732 | } | ||
733 | if (j < *n_old) | ||
734 | continue; | ||
735 | GNUNET_array_append(cur, n_cur, new[j]); | ||
736 | } | 739 | } |
740 | if (j < *n_old) | ||
741 | continue; | ||
742 | GNUNET_array_append (cur, n_cur, new[j]); | ||
743 | } | ||
737 | *old = cur; | 744 | *old = cur; |
738 | *n_old = n_cur; | 745 | *n_old = n_cur; |
739 | } | 746 | } |
@@ -746,7 +753,7 @@ merge_ops(struct GNUNET_TESTBED_Operation ***old, | |||
746 | * @param op the operation | 753 | * @param op the operation |
747 | */ | 754 | */ |
748 | static int | 755 | static int |
749 | check_readiness(struct GNUNET_TESTBED_Operation *op) | 756 | check_readiness (struct GNUNET_TESTBED_Operation *op) |
750 | { | 757 | { |
751 | struct GNUNET_TESTBED_Operation **evict_ops; | 758 | struct GNUNET_TESTBED_Operation **evict_ops; |
752 | struct GNUNET_TESTBED_Operation **ops; | 759 | struct GNUNET_TESTBED_Operation **ops; |
@@ -754,39 +761,39 @@ check_readiness(struct GNUNET_TESTBED_Operation *op) | |||
754 | unsigned int n_evict_ops; | 761 | unsigned int n_evict_ops; |
755 | unsigned int i; | 762 | unsigned int i; |
756 | 763 | ||
757 | GNUNET_assert(NULL == op->rq_entry); | 764 | GNUNET_assert (NULL == op->rq_entry); |
758 | GNUNET_assert(OP_STATE_WAITING == op->state); | 765 | GNUNET_assert (OP_STATE_WAITING == op->state); |
759 | evict_ops = NULL; | 766 | evict_ops = NULL; |
760 | n_evict_ops = 0; | 767 | n_evict_ops = 0; |
761 | for (i = 0; i < op->nqueues; i++) | 768 | for (i = 0; i < op->nqueues; i++) |
769 | { | ||
770 | ops = NULL; | ||
771 | n_ops = 0; | ||
772 | if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i], | ||
773 | &ops, &n_ops)) | ||
762 | { | 774 | { |
763 | ops = NULL; | 775 | GNUNET_free_non_null (evict_ops); |
764 | n_ops = 0; | 776 | return GNUNET_NO; |
765 | if (GNUNET_NO == decide_capacity(op->queues[i], op->qentries[i], | ||
766 | &ops, &n_ops)) | ||
767 | { | ||
768 | GNUNET_free_non_null(evict_ops); | ||
769 | return GNUNET_NO; | ||
770 | } | ||
771 | if (NULL == ops) | ||
772 | continue; | ||
773 | merge_ops(&evict_ops, &n_evict_ops, ops, n_ops); | ||
774 | GNUNET_free(ops); | ||
775 | } | 777 | } |
778 | if (NULL == ops) | ||
779 | continue; | ||
780 | merge_ops (&evict_ops, &n_evict_ops, ops, n_ops); | ||
781 | GNUNET_free (ops); | ||
782 | } | ||
776 | if (NULL != evict_ops) | 783 | if (NULL != evict_ops) |
777 | { | 784 | { |
778 | for (i = 0; i < n_evict_ops; i++) | 785 | for (i = 0; i < n_evict_ops; i++) |
779 | GNUNET_TESTBED_operation_release_(evict_ops[i]); | 786 | GNUNET_TESTBED_operation_release_ (evict_ops[i]); |
780 | GNUNET_free(evict_ops); | 787 | GNUNET_free (evict_ops); |
781 | evict_ops = NULL; | 788 | evict_ops = NULL; |
782 | /* Evicting the operations should schedule this operation */ | 789 | /* Evicting the operations should schedule this operation */ |
783 | GNUNET_assert(OP_STATE_READY == op->state); | 790 | GNUNET_assert (OP_STATE_READY == op->state); |
784 | return GNUNET_YES; | 791 | return GNUNET_YES; |
785 | } | 792 | } |
786 | for (i = 0; i < op->nqueues; i++) | 793 | for (i = 0; i < op->nqueues; i++) |
787 | op->queues[i]->active += op->nres[i]; | 794 | op->queues[i]->active += op->nres[i]; |
788 | change_state(op, OP_STATE_READY); | 795 | change_state (op, OP_STATE_READY); |
789 | rq_add(op); | 796 | rq_add (op); |
790 | return GNUNET_YES; | 797 | return GNUNET_YES; |
791 | } | 798 | } |
792 | 799 | ||
@@ -797,18 +804,18 @@ check_readiness(struct GNUNET_TESTBED_Operation *op) | |||
797 | * @param op the operation to defer | 804 | * @param op the operation to defer |
798 | */ | 805 | */ |
799 | static void | 806 | static void |
800 | defer(struct GNUNET_TESTBED_Operation *op) | 807 | defer (struct GNUNET_TESTBED_Operation *op) |
801 | { | 808 | { |
802 | unsigned int i; | 809 | unsigned int i; |
803 | 810 | ||
804 | GNUNET_assert(OP_STATE_READY == op->state); | 811 | GNUNET_assert (OP_STATE_READY == op->state); |
805 | rq_remove(op); | 812 | rq_remove (op); |
806 | for (i = 0; i < op->nqueues; i++) | 813 | for (i = 0; i < op->nqueues; i++) |
807 | { | 814 | { |
808 | GNUNET_assert(op->queues[i]->active >= op->nres[i]); | 815 | GNUNET_assert (op->queues[i]->active >= op->nres[i]); |
809 | op->queues[i]->active -= op->nres[i]; | 816 | op->queues[i]->active -= op->nres[i]; |
810 | } | 817 | } |
811 | change_state(op, OP_STATE_WAITING); | 818 | change_state (op, OP_STATE_WAITING); |
812 | } | 819 | } |
813 | 820 | ||
814 | 821 | ||
@@ -820,23 +827,23 @@ defer(struct GNUNET_TESTBED_Operation *op) | |||
820 | * @param queue the operation queue | 827 | * @param queue the operation queue |
821 | */ | 828 | */ |
822 | static void | 829 | static void |
823 | cleanup_tslots(struct OperationQueue *queue) | 830 | cleanup_tslots (struct OperationQueue *queue) |
824 | { | 831 | { |
825 | struct FeedbackCtx *fctx = queue->fctx; | 832 | struct FeedbackCtx *fctx = queue->fctx; |
826 | struct TimeSlot *tslot; | 833 | struct TimeSlot *tslot; |
827 | struct GNUNET_TESTBED_Operation *op; | 834 | struct GNUNET_TESTBED_Operation *op; |
828 | unsigned int cnt; | 835 | unsigned int cnt; |
829 | 836 | ||
830 | GNUNET_assert(NULL != fctx); | 837 | GNUNET_assert (NULL != fctx); |
831 | for (cnt = 0; cnt < queue->max_active; cnt++) | 838 | for (cnt = 0; cnt < queue->max_active; cnt++) |
832 | { | 839 | { |
833 | tslot = &fctx->tslots_freeptr[cnt]; | 840 | tslot = &fctx->tslots_freeptr[cnt]; |
834 | op = tslot->op; | 841 | op = tslot->op; |
835 | if (NULL == op) | 842 | if (NULL == op) |
836 | continue; | 843 | continue; |
837 | GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot); | 844 | GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); |
838 | } | 845 | } |
839 | GNUNET_free_non_null(fctx->tslots_freeptr); | 846 | GNUNET_free_non_null (fctx->tslots_freeptr); |
840 | fctx->tslots_freeptr = NULL; | 847 | fctx->tslots_freeptr = NULL; |
841 | fctx->alloc_head = NULL; | 848 | fctx->alloc_head = NULL; |
842 | fctx->alloc_tail = NULL; | 849 | fctx->alloc_tail = NULL; |
@@ -854,23 +861,24 @@ cleanup_tslots(struct OperationQueue *queue) | |||
854 | * will be selected as n | 861 | * will be selected as n |
855 | */ | 862 | */ |
856 | static void | 863 | static void |
857 | adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n) | 864 | adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n) |
858 | { | 865 | { |
859 | struct FeedbackCtx *fctx = queue->fctx; | 866 | struct FeedbackCtx *fctx = queue->fctx; |
860 | struct TimeSlot *tslot; | 867 | struct TimeSlot *tslot; |
861 | unsigned int cnt; | 868 | unsigned int cnt; |
862 | 869 | ||
863 | cleanup_tslots(queue); | 870 | cleanup_tslots (queue); |
864 | n = GNUNET_MIN(n, fctx->max_active_bound); | 871 | n = GNUNET_MIN (n, fctx->max_active_bound); |
865 | fctx->tslots_freeptr = GNUNET_malloc(n * sizeof(struct TimeSlot)); | 872 | fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot)); |
866 | fctx->nfailed = 0; | 873 | fctx->nfailed = 0; |
867 | for (cnt = 0; cnt < n; cnt++) | 874 | for (cnt = 0; cnt < n; cnt++) |
868 | { | 875 | { |
869 | tslot = &fctx->tslots_freeptr[cnt]; | 876 | tslot = &fctx->tslots_freeptr[cnt]; |
870 | tslot->queue = queue; | 877 | tslot->queue = queue; |
871 | GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail, tslot); | 878 | GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, |
872 | } | 879 | tslot); |
873 | GNUNET_TESTBED_operation_queue_reset_max_active_(queue, n); | 880 | } |
881 | GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n); | ||
874 | } | 882 | } |
875 | 883 | ||
876 | 884 | ||
@@ -881,7 +889,7 @@ adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n) | |||
881 | * @param queue the queue | 889 | * @param queue the queue |
882 | */ | 890 | */ |
883 | static void | 891 | static void |
884 | adapt_parallelism(struct OperationQueue *queue) | 892 | adapt_parallelism (struct OperationQueue *queue) |
885 | { | 893 | { |
886 | struct GNUNET_TIME_Relative avg; | 894 | struct GNUNET_TIME_Relative avg; |
887 | struct FeedbackCtx *fctx; | 895 | struct FeedbackCtx *fctx; |
@@ -895,32 +903,32 @@ adapt_parallelism(struct OperationQueue *queue) | |||
895 | nvals = 0; | 903 | nvals = 0; |
896 | fctx = queue->fctx; | 904 | fctx = queue->fctx; |
897 | for (cnt = 0; cnt < queue->max_active; cnt++) | 905 | for (cnt = 0; cnt < queue->max_active; cnt++) |
898 | { | 906 | { |
899 | tslot = &fctx->tslots_freeptr[cnt]; | 907 | tslot = &fctx->tslots_freeptr[cnt]; |
900 | avg = GNUNET_TIME_relative_add(avg, tslot->tsum); | 908 | avg = GNUNET_TIME_relative_add (avg, tslot->tsum); |
901 | nvals += tslot->nvals; | 909 | nvals += tslot->nvals; |
902 | } | 910 | } |
903 | GNUNET_assert(nvals >= queue->max_active); | 911 | GNUNET_assert (nvals >= queue->max_active); |
904 | GNUNET_assert(fctx->nfailed <= nvals); | 912 | GNUNET_assert (fctx->nfailed <= nvals); |
905 | nvals -= fctx->nfailed; | 913 | nvals -= fctx->nfailed; |
906 | if (0 == nvals) | 914 | if (0 == nvals) |
907 | { | 915 | { |
908 | if (1 == queue->max_active) | 916 | if (1 == queue->max_active) |
909 | adaptive_queue_set_max_active(queue, 1); | 917 | adaptive_queue_set_max_active (queue, 1); |
910 | else | 918 | else |
911 | adaptive_queue_set_max_active(queue, queue->max_active / 2); | 919 | adaptive_queue_set_max_active (queue, queue->max_active / 2); |
912 | return; | 920 | return; |
913 | } | 921 | } |
914 | avg = GNUNET_TIME_relative_divide(avg, nvals); | 922 | avg = GNUNET_TIME_relative_divide (avg, nvals); |
915 | GNUNET_TESTBED_SD_add_data_(fctx->sd, (unsigned int)avg.rel_value_us); | 923 | GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); |
916 | if (GNUNET_SYSERR == | 924 | if (GNUNET_SYSERR == |
917 | GNUNET_TESTBED_SD_deviation_factor_(fctx->sd, | 925 | GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, |
918 | (unsigned int)avg.rel_value_us, | 926 | (unsigned int) avg.rel_value_us, |
919 | &sd)) | 927 | &sd)) |
920 | { | 928 | { |
921 | adaptive_queue_set_max_active(queue, queue->max_active); /* no change */ | 929 | adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ |
922 | return; | 930 | return; |
923 | } | 931 | } |
924 | 932 | ||
925 | parallelism = 0; | 933 | parallelism = 0; |
926 | if (-1 == sd) | 934 | if (-1 == sd) |
@@ -931,36 +939,36 @@ adapt_parallelism(struct OperationQueue *queue) | |||
931 | parallelism = queue->max_active - 1; | 939 | parallelism = queue->max_active - 1; |
932 | if (2 <= sd) | 940 | if (2 <= sd) |
933 | parallelism = queue->max_active / 2; | 941 | parallelism = queue->max_active / 2; |
934 | parallelism = GNUNET_MAX(parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); | 942 | parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); |
935 | adaptive_queue_set_max_active(queue, parallelism); | 943 | adaptive_queue_set_max_active (queue, parallelism); |
936 | 944 | ||
937 | #if 0 | 945 | #if 0 |
938 | /* old algorithm */ | 946 | /* old algorithm */ |
939 | if (sd < 0) | 947 | if (sd < 0) |
940 | sd = 0; | 948 | sd = 0; |
941 | GNUNET_assert(0 <= sd); | 949 | GNUNET_assert (0 <= sd); |
942 | //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); | 950 | // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); |
943 | if (0 == sd) | 951 | if (0 == sd) |
944 | { | 952 | { |
945 | adaptive_queue_set_max_active(queue, queue->max_active * 2); | 953 | adaptive_queue_set_max_active (queue, queue->max_active * 2); |
946 | return; | 954 | return; |
947 | } | 955 | } |
948 | if (1 == sd) | 956 | if (1 == sd) |
949 | { | 957 | { |
950 | adaptive_queue_set_max_active(queue, queue->max_active + 1); | 958 | adaptive_queue_set_max_active (queue, queue->max_active + 1); |
951 | return; | 959 | return; |
952 | } | 960 | } |
953 | if (1 == queue->max_active) | 961 | if (1 == queue->max_active) |
954 | { | 962 | { |
955 | adaptive_queue_set_max_active(queue, 1); | 963 | adaptive_queue_set_max_active (queue, 1); |
956 | return; | 964 | return; |
957 | } | 965 | } |
958 | if (2 == sd) | 966 | if (2 == sd) |
959 | { | 967 | { |
960 | adaptive_queue_set_max_active(queue, queue->max_active - 1); | 968 | adaptive_queue_set_max_active (queue, queue->max_active - 1); |
961 | return; | 969 | return; |
962 | } | 970 | } |
963 | adaptive_queue_set_max_active(queue, queue->max_active / 2); | 971 | adaptive_queue_set_max_active (queue, queue->max_active / 2); |
964 | #endif | 972 | #endif |
965 | } | 973 | } |
966 | 974 | ||
@@ -973,7 +981,7 @@ adapt_parallelism(struct OperationQueue *queue) | |||
973 | * @param op the operation | 981 | * @param op the operation |
974 | */ | 982 | */ |
975 | static void | 983 | static void |
976 | update_tslots(struct GNUNET_TESTBED_Operation *op) | 984 | update_tslots (struct GNUNET_TESTBED_Operation *op) |
977 | { | 985 | { |
978 | struct OperationQueue *queue; | 986 | struct OperationQueue *queue; |
979 | struct GNUNET_TIME_Relative t; | 987 | struct GNUNET_TIME_Relative t; |
@@ -981,31 +989,31 @@ update_tslots(struct GNUNET_TESTBED_Operation *op) | |||
981 | struct FeedbackCtx *fctx; | 989 | struct FeedbackCtx *fctx; |
982 | unsigned int i; | 990 | unsigned int i; |
983 | 991 | ||
984 | t = GNUNET_TIME_absolute_get_duration(op->tstart); | 992 | t = GNUNET_TIME_absolute_get_duration (op->tstart); |
985 | while (NULL != (tslot = op->tslots_head)) /* update time slots */ | 993 | while (NULL != (tslot = op->tslots_head)) /* update time slots */ |
994 | { | ||
995 | queue = tslot->queue; | ||
996 | fctx = queue->fctx; | ||
997 | GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); | ||
998 | tslot->op = NULL; | ||
999 | GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, | ||
1000 | tslot); | ||
1001 | if (op->failed) | ||
986 | { | 1002 | { |
987 | queue = tslot->queue; | 1003 | fctx->nfailed++; |
988 | fctx = queue->fctx; | 1004 | for (i = 0; i < op->nqueues; i++) |
989 | GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot); | 1005 | if (queue == op->queues[i]) |
990 | tslot->op = NULL; | 1006 | break; |
991 | GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail, | 1007 | GNUNET_assert (i != op->nqueues); |
992 | tslot); | 1008 | op->queues[i]->overload += op->nres[i]; |
993 | if (op->failed) | ||
994 | { | ||
995 | fctx->nfailed++; | ||
996 | for (i = 0; i < op->nqueues; i++) | ||
997 | if (queue == op->queues[i]) | ||
998 | break; | ||
999 | GNUNET_assert(i != op->nqueues); | ||
1000 | op->queues[i]->overload += op->nres[i]; | ||
1001 | } | ||
1002 | tslot->tsum = GNUNET_TIME_relative_add(tslot->tsum, t); | ||
1003 | if (0 != tslot->nvals++) | ||
1004 | continue; | ||
1005 | fctx->tslots_filled++; | ||
1006 | if (queue->max_active == fctx->tslots_filled) | ||
1007 | adapt_parallelism(queue); | ||
1008 | } | 1009 | } |
1010 | tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); | ||
1011 | if (0 != tslot->nvals++) | ||
1012 | continue; | ||
1013 | fctx->tslots_filled++; | ||
1014 | if (queue->max_active == fctx->tslots_filled) | ||
1015 | adapt_parallelism (queue); | ||
1016 | } | ||
1009 | } | 1017 | } |
1010 | 1018 | ||
1011 | 1019 | ||
@@ -1018,12 +1026,12 @@ update_tslots(struct GNUNET_TESTBED_Operation *op) | |||
1018 | * @return handle to the operation | 1026 | * @return handle to the operation |
1019 | */ | 1027 | */ |
1020 | struct GNUNET_TESTBED_Operation * | 1028 | struct GNUNET_TESTBED_Operation * |
1021 | GNUNET_TESTBED_operation_create_(void *cls, OperationStart start, | 1029 | GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start, |
1022 | OperationRelease release) | 1030 | OperationRelease release) |
1023 | { | 1031 | { |
1024 | struct GNUNET_TESTBED_Operation *op; | 1032 | struct GNUNET_TESTBED_Operation *op; |
1025 | 1033 | ||
1026 | op = GNUNET_new(struct GNUNET_TESTBED_Operation); | 1034 | op = GNUNET_new (struct GNUNET_TESTBED_Operation); |
1027 | op->start = start; | 1035 | op->start = start; |
1028 | op->state = OP_STATE_INIT; | 1036 | op->state = OP_STATE_INIT; |
1029 | op->release = release; | 1037 | op->release = release; |
@@ -1041,26 +1049,26 @@ GNUNET_TESTBED_operation_create_(void *cls, OperationStart start, | |||
1041 | * @return handle to the queue | 1049 | * @return handle to the queue |
1042 | */ | 1050 | */ |
1043 | struct OperationQueue * | 1051 | struct OperationQueue * |
1044 | GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type, | 1052 | GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, |
1045 | unsigned int max_active) | 1053 | unsigned int max_active) |
1046 | { | 1054 | { |
1047 | struct OperationQueue *queue; | 1055 | struct OperationQueue *queue; |
1048 | struct FeedbackCtx *fctx; | 1056 | struct FeedbackCtx *fctx; |
1049 | 1057 | ||
1050 | queue = GNUNET_new(struct OperationQueue); | 1058 | queue = GNUNET_new (struct OperationQueue); |
1051 | queue->type = type; | 1059 | queue->type = type; |
1052 | if (OPERATION_QUEUE_TYPE_FIXED == type) | 1060 | if (OPERATION_QUEUE_TYPE_FIXED == type) |
1053 | { | 1061 | { |
1054 | queue->max_active = max_active; | 1062 | queue->max_active = max_active; |
1055 | } | 1063 | } |
1056 | else | 1064 | else |
1057 | { | 1065 | { |
1058 | fctx = GNUNET_new(struct FeedbackCtx); | 1066 | fctx = GNUNET_new (struct FeedbackCtx); |
1059 | fctx->max_active_bound = max_active; | 1067 | fctx->max_active_bound = max_active; |
1060 | fctx->sd = GNUNET_TESTBED_SD_init_(ADAPTIVE_QUEUE_DEFAULT_HISTORY); | 1068 | fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY); |
1061 | queue->fctx = fctx; | 1069 | queue->fctx = fctx; |
1062 | adaptive_queue_set_max_active(queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); | 1070 | adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); |
1063 | } | 1071 | } |
1064 | return queue; | 1072 | return queue; |
1065 | } | 1073 | } |
1066 | 1074 | ||
@@ -1071,18 +1079,18 @@ GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type, | |||
1071 | * @param queue the operation queue to destroy | 1079 | * @param queue the operation queue to destroy |
1072 | */ | 1080 | */ |
1073 | static void | 1081 | static void |
1074 | queue_destroy(struct OperationQueue *queue) | 1082 | queue_destroy (struct OperationQueue *queue) |
1075 | { | 1083 | { |
1076 | struct FeedbackCtx *fctx; | 1084 | struct FeedbackCtx *fctx; |
1077 | 1085 | ||
1078 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) | 1086 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) |
1079 | { | 1087 | { |
1080 | cleanup_tslots(queue); | 1088 | cleanup_tslots (queue); |
1081 | fctx = queue->fctx; | 1089 | fctx = queue->fctx; |
1082 | GNUNET_TESTBED_SD_destroy_(fctx->sd); | 1090 | GNUNET_TESTBED_SD_destroy_ (fctx->sd); |
1083 | GNUNET_free(fctx); | 1091 | GNUNET_free (fctx); |
1084 | } | 1092 | } |
1085 | GNUNET_free(queue); | 1093 | GNUNET_free (queue); |
1086 | } | 1094 | } |
1087 | 1095 | ||
1088 | 1096 | ||
@@ -1094,16 +1102,16 @@ queue_destroy(struct OperationQueue *queue) | |||
1094 | * @param queue queue to destroy | 1102 | * @param queue queue to destroy |
1095 | */ | 1103 | */ |
1096 | void | 1104 | void |
1097 | GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue) | 1105 | GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) |
1098 | { | 1106 | { |
1099 | if (GNUNET_YES != is_queue_empty(queue)) | 1107 | if (GNUNET_YES != is_queue_empty (queue)) |
1100 | { | 1108 | { |
1101 | GNUNET_assert(0 == queue->expired); /* Are you calling twice on same queue? */ | 1109 | GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */ |
1102 | queue->expired = 1; | 1110 | queue->expired = 1; |
1103 | GNUNET_array_append(expired_opqs, n_expired_opqs, queue); | 1111 | GNUNET_array_append (expired_opqs, n_expired_opqs, queue); |
1104 | return; | 1112 | return; |
1105 | } | 1113 | } |
1106 | queue_destroy(queue); | 1114 | queue_destroy (queue); |
1107 | } | 1115 | } |
1108 | 1116 | ||
1109 | 1117 | ||
@@ -1115,11 +1123,11 @@ GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue) | |||
1115 | * is not empty) | 1123 | * is not empty) |
1116 | */ | 1124 | */ |
1117 | int | 1125 | int |
1118 | GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue) | 1126 | GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) |
1119 | { | 1127 | { |
1120 | if (GNUNET_NO == is_queue_empty(queue)) | 1128 | if (GNUNET_NO == is_queue_empty (queue)) |
1121 | return GNUNET_NO; | 1129 | return GNUNET_NO; |
1122 | GNUNET_TESTBED_operation_queue_destroy_(queue); | 1130 | GNUNET_TESTBED_operation_queue_destroy_ (queue); |
1123 | return GNUNET_YES; | 1131 | return GNUNET_YES; |
1124 | } | 1132 | } |
1125 | 1133 | ||
@@ -1131,19 +1139,19 @@ GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue) | |||
1131 | * @param opq the operation queue | 1139 | * @param opq the operation queue |
1132 | */ | 1140 | */ |
1133 | static void | 1141 | static void |
1134 | recheck_waiting(struct OperationQueue *opq) | 1142 | recheck_waiting (struct OperationQueue *opq) |
1135 | { | 1143 | { |
1136 | struct QueueEntry *entry; | 1144 | struct QueueEntry *entry; |
1137 | struct QueueEntry *entry2; | 1145 | struct QueueEntry *entry2; |
1138 | 1146 | ||
1139 | entry = opq->wq_head; | 1147 | entry = opq->wq_head; |
1140 | while (NULL != entry) | 1148 | while (NULL != entry) |
1141 | { | 1149 | { |
1142 | entry2 = entry->next; | 1150 | entry2 = entry->next; |
1143 | if (GNUNET_NO == check_readiness(entry->op)) | 1151 | if (GNUNET_NO == check_readiness (entry->op)) |
1144 | break; | 1152 | break; |
1145 | entry = entry2; | 1153 | entry = entry2; |
1146 | } | 1154 | } |
1147 | } | 1155 | } |
1148 | 1156 | ||
1149 | 1157 | ||
@@ -1156,8 +1164,8 @@ recheck_waiting(struct OperationQueue *opq) | |||
1156 | * @param max_active the new maximum number of active operations | 1164 | * @param max_active the new maximum number of active operations |
1157 | */ | 1165 | */ |
1158 | void | 1166 | void |
1159 | GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, | 1167 | GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, |
1160 | unsigned int max_active) | 1168 | unsigned int max_active) |
1161 | { | 1169 | { |
1162 | struct QueueEntry *entry; | 1170 | struct QueueEntry *entry; |
1163 | 1171 | ||
@@ -1165,8 +1173,8 @@ GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, | |||
1165 | queue->overload = 0; | 1173 | queue->overload = 0; |
1166 | while ((queue->active > queue->max_active) | 1174 | while ((queue->active > queue->max_active) |
1167 | && (NULL != (entry = queue->rq_head))) | 1175 | && (NULL != (entry = queue->rq_head))) |
1168 | defer(entry->op); | 1176 | defer (entry->op); |
1169 | recheck_waiting(queue); | 1177 | recheck_waiting (queue); |
1170 | } | 1178 | } |
1171 | 1179 | ||
1172 | 1180 | ||
@@ -1182,17 +1190,17 @@ GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, | |||
1182 | * operation. Should be greater than 0. | 1190 | * operation. Should be greater than 0. |
1183 | */ | 1191 | */ |
1184 | void | 1192 | void |
1185 | GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue, | 1193 | GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, |
1186 | struct GNUNET_TESTBED_Operation *op, | 1194 | struct GNUNET_TESTBED_Operation *op, |
1187 | unsigned int nres) | 1195 | unsigned int nres) |
1188 | { | 1196 | { |
1189 | unsigned int qsize; | 1197 | unsigned int qsize; |
1190 | 1198 | ||
1191 | GNUNET_assert(0 < nres); | 1199 | GNUNET_assert (0 < nres); |
1192 | qsize = op->nqueues; | 1200 | qsize = op->nqueues; |
1193 | GNUNET_array_append(op->queues, op->nqueues, queue); | 1201 | GNUNET_array_append (op->queues, op->nqueues, queue); |
1194 | GNUNET_array_append(op->nres, qsize, nres); | 1202 | GNUNET_array_append (op->nres, qsize, nres); |
1195 | GNUNET_assert(qsize == op->nqueues); | 1203 | GNUNET_assert (qsize == op->nqueues); |
1196 | } | 1204 | } |
1197 | 1205 | ||
1198 | 1206 | ||
@@ -1208,10 +1216,10 @@ GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue, | |||
1208 | * @param op operation to add to the queue | 1216 | * @param op operation to add to the queue |
1209 | */ | 1217 | */ |
1210 | void | 1218 | void |
1211 | GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue, | 1219 | GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, |
1212 | struct GNUNET_TESTBED_Operation *op) | 1220 | struct GNUNET_TESTBED_Operation *op) |
1213 | { | 1221 | { |
1214 | return GNUNET_TESTBED_operation_queue_insert2_(queue, op, 1); | 1222 | return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1); |
1215 | } | 1223 | } |
1216 | 1224 | ||
1217 | 1225 | ||
@@ -1225,11 +1233,11 @@ GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue, | |||
1225 | * @param op the operation to marks as waiting | 1233 | * @param op the operation to marks as waiting |
1226 | */ | 1234 | */ |
1227 | void | 1235 | void |
1228 | GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op) | 1236 | GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) |
1229 | { | 1237 | { |
1230 | GNUNET_assert(NULL == op->rq_entry); | 1238 | GNUNET_assert (NULL == op->rq_entry); |
1231 | change_state(op, OP_STATE_WAITING); | 1239 | change_state (op, OP_STATE_WAITING); |
1232 | (void)check_readiness(op); | 1240 | (void) check_readiness (op); |
1233 | } | 1241 | } |
1234 | 1242 | ||
1235 | 1243 | ||
@@ -1243,24 +1251,24 @@ GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op) | |||
1243 | * it as inactive. | 1251 | * it as inactive. |
1244 | */ | 1252 | */ |
1245 | void | 1253 | void |
1246 | GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op) | 1254 | GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) |
1247 | { | 1255 | { |
1248 | struct OperationQueue **queues; | 1256 | struct OperationQueue **queues; |
1249 | size_t ms; | 1257 | size_t ms; |
1250 | unsigned int nqueues; | 1258 | unsigned int nqueues; |
1251 | unsigned int i; | 1259 | unsigned int i; |
1252 | 1260 | ||
1253 | GNUNET_assert(OP_STATE_ACTIVE == op->state); | 1261 | GNUNET_assert (OP_STATE_ACTIVE == op->state); |
1254 | change_state(op, OP_STATE_INACTIVE); | 1262 | change_state (op, OP_STATE_INACTIVE); |
1255 | nqueues = op->nqueues; | 1263 | nqueues = op->nqueues; |
1256 | ms = sizeof(struct OperationQueue *) * nqueues; | 1264 | ms = sizeof(struct OperationQueue *) * nqueues; |
1257 | queues = GNUNET_malloc(ms); | 1265 | queues = GNUNET_malloc (ms); |
1258 | /* Cloning is needed as the operation be released by waiting operations and | 1266 | /* Cloning is needed as the operation be released by waiting operations and |
1259 | hence its nqueues memory ptr will be freed */ | 1267 | hence its nqueues memory ptr will be freed */ |
1260 | GNUNET_memcpy(queues, op->queues, ms); | 1268 | GNUNET_memcpy (queues, op->queues, ms); |
1261 | for (i = 0; i < nqueues; i++) | 1269 | for (i = 0; i < nqueues; i++) |
1262 | recheck_waiting(queues[i]); | 1270 | recheck_waiting (queues[i]); |
1263 | GNUNET_free(queues); | 1271 | GNUNET_free (queues); |
1264 | } | 1272 | } |
1265 | 1273 | ||
1266 | 1274 | ||
@@ -1272,10 +1280,10 @@ GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op) | |||
1272 | * @param op the operation to be marked as active | 1280 | * @param op the operation to be marked as active |
1273 | */ | 1281 | */ |
1274 | void | 1282 | void |
1275 | GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op) | 1283 | GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) |
1276 | { | 1284 | { |
1277 | GNUNET_assert(OP_STATE_INACTIVE == op->state); | 1285 | GNUNET_assert (OP_STATE_INACTIVE == op->state); |
1278 | change_state(op, OP_STATE_ACTIVE); | 1286 | change_state (op, OP_STATE_ACTIVE); |
1279 | } | 1287 | } |
1280 | 1288 | ||
1281 | 1289 | ||
@@ -1286,56 +1294,56 @@ GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op) | |||
1286 | * @param op operation that finished | 1294 | * @param op operation that finished |
1287 | */ | 1295 | */ |
1288 | void | 1296 | void |
1289 | GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op) | 1297 | GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) |
1290 | { | 1298 | { |
1291 | struct QueueEntry *entry; | 1299 | struct QueueEntry *entry; |
1292 | struct OperationQueue *opq; | 1300 | struct OperationQueue *opq; |
1293 | unsigned int i; | 1301 | unsigned int i; |
1294 | 1302 | ||
1295 | if (OP_STATE_INIT == op->state) | 1303 | if (OP_STATE_INIT == op->state) |
1296 | { | 1304 | { |
1297 | GNUNET_free(op); | 1305 | GNUNET_free (op); |
1298 | return; | 1306 | return; |
1299 | } | 1307 | } |
1300 | if (OP_STATE_READY == op->state) | 1308 | if (OP_STATE_READY == op->state) |
1301 | rq_remove(op); | 1309 | rq_remove (op); |
1302 | if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ | 1310 | if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ |
1303 | GNUNET_TESTBED_operation_activate_(op); | 1311 | GNUNET_TESTBED_operation_activate_ (op); |
1304 | if (OP_STATE_ACTIVE == op->state) | 1312 | if (OP_STATE_ACTIVE == op->state) |
1305 | update_tslots(op); | 1313 | update_tslots (op); |
1306 | GNUNET_assert(NULL != op->queues); | 1314 | GNUNET_assert (NULL != op->queues); |
1307 | GNUNET_assert(NULL != op->qentries); | 1315 | GNUNET_assert (NULL != op->qentries); |
1308 | for (i = 0; i < op->nqueues; i++) | 1316 | for (i = 0; i < op->nqueues; i++) |
1317 | { | ||
1318 | entry = op->qentries[i]; | ||
1319 | remove_queue_entry (op, i); | ||
1320 | opq = op->queues[i]; | ||
1321 | switch (op->state) | ||
1309 | { | 1322 | { |
1310 | entry = op->qentries[i]; | 1323 | case OP_STATE_INIT: |
1311 | remove_queue_entry(op, i); | 1324 | case OP_STATE_INACTIVE: |
1312 | opq = op->queues[i]; | 1325 | GNUNET_assert (0); |
1313 | switch (op->state) | 1326 | break; |
1314 | { | ||
1315 | case OP_STATE_INIT: | ||
1316 | case OP_STATE_INACTIVE: | ||
1317 | GNUNET_assert(0); | ||
1318 | break; | ||
1319 | 1327 | ||
1320 | case OP_STATE_WAITING: | 1328 | case OP_STATE_WAITING: |
1321 | break; | 1329 | break; |
1322 | 1330 | ||
1323 | case OP_STATE_ACTIVE: | 1331 | case OP_STATE_ACTIVE: |
1324 | case OP_STATE_READY: | 1332 | case OP_STATE_READY: |
1325 | GNUNET_assert(0 != opq->active); | 1333 | GNUNET_assert (0 != opq->active); |
1326 | GNUNET_assert(opq->active >= entry->nres); | 1334 | GNUNET_assert (opq->active >= entry->nres); |
1327 | opq->active -= entry->nres; | 1335 | opq->active -= entry->nres; |
1328 | recheck_waiting(opq); | 1336 | recheck_waiting (opq); |
1329 | break; | 1337 | break; |
1330 | } | ||
1331 | GNUNET_free(entry); | ||
1332 | } | 1338 | } |
1333 | GNUNET_free_non_null(op->qentries); | 1339 | GNUNET_free (entry); |
1334 | GNUNET_free(op->queues); | 1340 | } |
1335 | GNUNET_free(op->nres); | 1341 | GNUNET_free_non_null (op->qentries); |
1342 | GNUNET_free (op->queues); | ||
1343 | GNUNET_free (op->nres); | ||
1336 | if (NULL != op->release) | 1344 | if (NULL != op->release) |
1337 | op->release(op->cb_cls); | 1345 | op->release (op->cb_cls); |
1338 | GNUNET_free(op); | 1346 | GNUNET_free (op); |
1339 | } | 1347 | } |
1340 | 1348 | ||
1341 | 1349 | ||
@@ -1345,7 +1353,7 @@ GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op) | |||
1345 | * @param op the operation to be marked as failed | 1353 | * @param op the operation to be marked as failed |
1346 | */ | 1354 | */ |
1347 | void | 1355 | void |
1348 | GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op) | 1356 | GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op) |
1349 | { | 1357 | { |
1350 | op->failed = GNUNET_YES; | 1358 | op->failed = GNUNET_YES; |
1351 | } | 1359 | } |
@@ -1356,23 +1364,23 @@ GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op) | |||
1356 | * operations which are not completed and warn about them. | 1364 | * operations which are not completed and warn about them. |
1357 | */ | 1365 | */ |
1358 | void __attribute__ ((destructor)) | 1366 | void __attribute__ ((destructor)) |
1359 | GNUNET_TESTBED_operations_fini() | 1367 | GNUNET_TESTBED_operations_fini () |
1360 | { | 1368 | { |
1361 | struct OperationQueue *queue; | 1369 | struct OperationQueue *queue; |
1362 | unsigned int i; | 1370 | unsigned int i; |
1363 | int warn = 0; | 1371 | int warn = 0; |
1364 | 1372 | ||
1365 | for (i = 0; i < n_expired_opqs; i++) | 1373 | for (i = 0; i < n_expired_opqs; i++) |
1366 | { | 1374 | { |
1367 | queue = expired_opqs[i]; | 1375 | queue = expired_opqs[i]; |
1368 | if (GNUNET_NO == is_queue_empty(queue)) | 1376 | if (GNUNET_NO == is_queue_empty (queue)) |
1369 | warn = 1; | 1377 | warn = 1; |
1370 | queue_destroy(queue); | 1378 | queue_destroy (queue); |
1371 | } | 1379 | } |
1372 | GNUNET_free_non_null(expired_opqs); | 1380 | GNUNET_free_non_null (expired_opqs); |
1373 | n_expired_opqs = 0; | 1381 | n_expired_opqs = 0; |
1374 | if (warn) | 1382 | if (warn) |
1375 | GNUNET_log(GNUNET_ERROR_TYPE_WARNING, | 1383 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1376 | "Be disciplined. Some operations were not marked as done.\n"); | 1384 | "Be disciplined. Some operations were not marked as done.\n"); |
1377 | } | 1385 | } |
1378 | /* end of testbed_api_operations.c */ | 1386 | /* end of testbed_api_operations.c */ |