diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2014-02-18 15:43:38 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2014-02-18 15:43:38 +0000 |
commit | 6b75b4dc34ff61e06844277c7cc9ff38d49fffeb (patch) | |
tree | 4b27400b361686d853b12c758f8450e74e4f5887 /src/testbed | |
parent | 31c3c7c755dcf707f73b00959970bfd27da4b7ca (diff) | |
download | gnunet-6b75b4dc34ff61e06844277c7cc9ff38d49fffeb.tar.gz gnunet-6b75b4dc34ff61e06844277c7cc9ff38d49fffeb.zip |
Consider the resources from failed operations as overloaded and not use them
until the parallelism is refreshed.
This commit also fixes a bug where the parallelism is set to 0 and hence no
progress can be made.
Diffstat (limited to 'src/testbed')
-rw-r--r-- | src/testbed/testbed_api_operations.c | 62 |
1 files changed, 51 insertions, 11 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index 2a559a90e..07fb4940d 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c | |||
@@ -29,6 +29,17 @@ | |||
29 | #include "testbed_api_operations.h" | 29 | #include "testbed_api_operations.h" |
30 | #include "testbed_api_sd.h" | 30 | #include "testbed_api_sd.h" |
31 | 31 | ||
32 | /** | ||
33 | * The number of readings containing past operation's timing information that we | ||
34 | * keep track of for adaptive queues | ||
35 | */ | ||
36 | #define ADAPTIVE_QUEUE_DEFAULT_HISTORY 10 | ||
37 | |||
38 | /** | ||
39 | * The number of parallel opeartions we start with by default for adaptive | ||
40 | * queues | ||
41 | */ | ||
42 | #define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4 | ||
32 | 43 | ||
33 | /** | 44 | /** |
34 | * An entry in the operation queue | 45 | * An entry in the operation queue |
@@ -141,7 +152,6 @@ struct FeedbackCtx | |||
141 | * Number of operations that have failed | 152 | * Number of operations that have failed |
142 | */ | 153 | */ |
143 | unsigned int nfailed; | 154 | unsigned int nfailed; |
144 | |||
145 | }; | 155 | }; |
146 | 156 | ||
147 | 157 | ||
@@ -216,10 +226,16 @@ struct OperationQueue | |||
216 | * Max number of operations which can be active at any time in this queue. | 226 | * Max number of operations which can be active at any time in this queue. |
217 | * This value can be changed either by calling | 227 | * This value can be changed either by calling |
218 | * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive | 228 | * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive |
219 | * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE | 229 | * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE |
220 | */ | 230 | */ |
221 | unsigned int max_active; | 231 | unsigned int max_active; |
222 | 232 | ||
233 | /** | ||
234 | * The number of resources occupied by failed operations in the current shot. | ||
235 | * This is only relavant if the operation queue is of type | ||
236 | * #OPERATION_QUEUE_TYPE_ADAPTIVE | ||
237 | */ | ||
238 | unsigned int overload; | ||
223 | }; | 239 | }; |
224 | 240 | ||
225 | 241 | ||
@@ -613,6 +629,7 @@ decide_capacity (struct OperationQueue *opq, | |||
613 | unsigned int n_ops; | 629 | unsigned int n_ops; |
614 | unsigned int n_evict_entries; | 630 | unsigned int n_evict_entries; |
615 | unsigned int need; | 631 | unsigned int need; |
632 | unsigned int max; | ||
616 | int deficit; | 633 | int deficit; |
617 | int rval; | 634 | int rval; |
618 | 635 | ||
@@ -623,14 +640,22 @@ decide_capacity (struct OperationQueue *opq, | |||
623 | evict_entries = NULL; | 640 | evict_entries = NULL; |
624 | n_evict_entries = 0; | 641 | n_evict_entries = 0; |
625 | rval = GNUNET_YES; | 642 | rval = GNUNET_YES; |
626 | if (opq->active > opq->max_active) | 643 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) |
644 | { | ||
645 | GNUNET_assert (NULL != opq->fctx); | ||
646 | GNUNET_assert (opq->max_active >= opq->overload); | ||
647 | max = opq->max_active - opq->overload; | ||
648 | } | ||
649 | else | ||
650 | max = opq->max_active; | ||
651 | if (opq->active > max) | ||
627 | { | 652 | { |
628 | rval = GNUNET_NO; | 653 | rval = GNUNET_NO; |
629 | goto ret; | 654 | goto ret; |
630 | } | 655 | } |
631 | if ((opq->active + need) <= opq->max_active) | 656 | if ((opq->active + need) <= max) |
632 | goto ret; | 657 | goto ret; |
633 | deficit = need - (opq->max_active - opq->active); | 658 | deficit = need - (max - opq->active); |
634 | for (entry = opq->nq_head; | 659 | for (entry = opq->nq_head; |
635 | (0 < deficit) && (NULL != entry); | 660 | (0 < deficit) && (NULL != entry); |
636 | entry = entry->next) | 661 | entry = entry->next) |
@@ -825,6 +850,7 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n) | |||
825 | n = GNUNET_MIN (n ,fctx->max_active_bound); | 850 | n = GNUNET_MIN (n ,fctx->max_active_bound); |
826 | fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot)); | 851 | fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot)); |
827 | fctx->nfailed = 0; | 852 | fctx->nfailed = 0; |
853 | FPRINTF (stderr, "Parallelism: %u\n", n); | ||
828 | for (cnt = 0; cnt < n; cnt++) | 854 | for (cnt = 0; cnt < n; cnt++) |
829 | { | 855 | { |
830 | tslot = &fctx->tslots_freeptr[cnt]; | 856 | tslot = &fctx->tslots_freeptr[cnt]; |
@@ -881,14 +907,19 @@ adapt_parallelism (struct OperationQueue *queue) | |||
881 | adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ | 907 | adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ |
882 | return; | 908 | return; |
883 | } | 909 | } |
884 | if (1 == sd) | ||
885 | adaptive_queue_set_max_active (queue, queue->max_active - 1); | ||
886 | if (2 <= sd) | ||
887 | adaptive_queue_set_max_active (queue, queue->max_active / 2); | ||
888 | if (-1 == sd) | 910 | if (-1 == sd) |
889 | adaptive_queue_set_max_active (queue, queue->max_active + 1); | 911 | adaptive_queue_set_max_active (queue, queue->max_active + 1); |
890 | if (sd <= -2) | 912 | if (sd <= -2) |
891 | adaptive_queue_set_max_active (queue, queue->max_active * 2); | 913 | adaptive_queue_set_max_active (queue, queue->max_active * 2); |
914 | if (1 == queue->max_active) | ||
915 | { | ||
916 | adaptive_queue_set_max_active (queue, 1); | ||
917 | return; | ||
918 | } | ||
919 | if (1 == sd) | ||
920 | adaptive_queue_set_max_active (queue, queue->max_active - 1); | ||
921 | if (2 <= sd) | ||
922 | adaptive_queue_set_max_active (queue, queue->max_active / 2); | ||
892 | 923 | ||
893 | #if 0 /* old algorithm */ | 924 | #if 0 /* old algorithm */ |
894 | if (sd < 0) | 925 | if (sd < 0) |
@@ -934,6 +965,7 @@ update_tslots (struct GNUNET_TESTBED_Operation *op) | |||
934 | struct GNUNET_TIME_Relative t; | 965 | struct GNUNET_TIME_Relative t; |
935 | struct TimeSlot *tslot; | 966 | struct TimeSlot *tslot; |
936 | struct FeedbackCtx *fctx; | 967 | struct FeedbackCtx *fctx; |
968 | unsigned int i; | ||
937 | 969 | ||
938 | t = GNUNET_TIME_absolute_get_duration (op->tstart); | 970 | t = GNUNET_TIME_absolute_get_duration (op->tstart); |
939 | while (NULL != (tslot = op->tslots_head)) /* update time slots */ | 971 | while (NULL != (tslot = op->tslots_head)) /* update time slots */ |
@@ -945,7 +977,14 @@ update_tslots (struct GNUNET_TESTBED_Operation *op) | |||
945 | GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, | 977 | GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, |
946 | tslot); | 978 | tslot); |
947 | if (op->failed) | 979 | if (op->failed) |
980 | { | ||
948 | fctx->nfailed++; | 981 | fctx->nfailed++; |
982 | for (i = 0; i < op->nqueues; i++) | ||
983 | if (queue == op->queues[i]) | ||
984 | break; | ||
985 | GNUNET_assert (i != op->nqueues); | ||
986 | op->queues[i]->overload += op->nres[i]; | ||
987 | } | ||
949 | tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); | 988 | tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); |
950 | if (0 != tslot->nvals++) | 989 | if (0 != tslot->nvals++) |
951 | continue; | 990 | continue; |
@@ -1004,9 +1043,9 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, | |||
1004 | { | 1043 | { |
1005 | fctx = GNUNET_new (struct FeedbackCtx); | 1044 | fctx = GNUNET_new (struct FeedbackCtx); |
1006 | fctx->max_active_bound = max_active; | 1045 | fctx->max_active_bound = max_active; |
1007 | fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */ | 1046 | fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY); |
1008 | queue->fctx = fctx; | 1047 | queue->fctx = fctx; |
1009 | adaptive_queue_set_max_active (queue, 4); /* start with 4 */ | 1048 | adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); |
1010 | } | 1049 | } |
1011 | return queue; | 1050 | return queue; |
1012 | } | 1051 | } |
@@ -1090,6 +1129,7 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, | |||
1090 | struct QueueEntry *entry; | 1129 | struct QueueEntry *entry; |
1091 | 1130 | ||
1092 | queue->max_active = max_active; | 1131 | queue->max_active = max_active; |
1132 | queue->overload = 0; | ||
1093 | while ( (queue->active > queue->max_active) | 1133 | while ( (queue->active > queue->max_active) |
1094 | && (NULL != (entry = queue->rq_head)) ) | 1134 | && (NULL != (entry = queue->rq_head)) ) |
1095 | defer (entry->op); | 1135 | defer (entry->op); |