aboutsummaryrefslogtreecommitdiff
path: root/src/testbed
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2014-02-18 15:43:38 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2014-02-18 15:43:38 +0000
commit6b75b4dc34ff61e06844277c7cc9ff38d49fffeb (patch)
tree4b27400b361686d853b12c758f8450e74e4f5887 /src/testbed
parent31c3c7c755dcf707f73b00959970bfd27da4b7ca (diff)
downloadgnunet-6b75b4dc34ff61e06844277c7cc9ff38d49fffeb.tar.gz
gnunet-6b75b4dc34ff61e06844277c7cc9ff38d49fffeb.zip
Consider the resources from failed operations as overloaded and not use them
until the parallelism is refreshed. This commit also fixes a bug where the parallelism is set to 0 and hence no progress can be made.
Diffstat (limited to 'src/testbed')
-rw-r--r--src/testbed/testbed_api_operations.c62
1 files changed, 51 insertions, 11 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 2a559a90e..07fb4940d 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -29,6 +29,17 @@
29#include "testbed_api_operations.h" 29#include "testbed_api_operations.h"
30#include "testbed_api_sd.h" 30#include "testbed_api_sd.h"
31 31
32/**
33 * The number of readings containing past operation's timing information that we
34 * keep track of for adaptive queues
35 */
36#define ADAPTIVE_QUEUE_DEFAULT_HISTORY 10
37
38/**
39 * The number of parallel opeartions we start with by default for adaptive
40 * queues
41 */
42#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
32 43
33/** 44/**
34 * An entry in the operation queue 45 * An entry in the operation queue
@@ -141,7 +152,6 @@ struct FeedbackCtx
141 * Number of operations that have failed 152 * Number of operations that have failed
142 */ 153 */
143 unsigned int nfailed; 154 unsigned int nfailed;
144
145}; 155};
146 156
147 157
@@ -216,10 +226,16 @@ struct OperationQueue
216 * Max number of operations which can be active at any time in this queue. 226 * Max number of operations which can be active at any time in this queue.
217 * This value can be changed either by calling 227 * This value can be changed either by calling
218 * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive 228 * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
219 * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE 229 * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE
220 */ 230 */
221 unsigned int max_active; 231 unsigned int max_active;
222 232
233 /**
234 * The number of resources occupied by failed operations in the current shot.
235 * This is only relavant if the operation queue is of type
236 * #OPERATION_QUEUE_TYPE_ADAPTIVE
237 */
238 unsigned int overload;
223}; 239};
224 240
225 241
@@ -613,6 +629,7 @@ decide_capacity (struct OperationQueue *opq,
613 unsigned int n_ops; 629 unsigned int n_ops;
614 unsigned int n_evict_entries; 630 unsigned int n_evict_entries;
615 unsigned int need; 631 unsigned int need;
632 unsigned int max;
616 int deficit; 633 int deficit;
617 int rval; 634 int rval;
618 635
@@ -623,14 +640,22 @@ decide_capacity (struct OperationQueue *opq,
623 evict_entries = NULL; 640 evict_entries = NULL;
624 n_evict_entries = 0; 641 n_evict_entries = 0;
625 rval = GNUNET_YES; 642 rval = GNUNET_YES;
626 if (opq->active > opq->max_active) 643 if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
644 {
645 GNUNET_assert (NULL != opq->fctx);
646 GNUNET_assert (opq->max_active >= opq->overload);
647 max = opq->max_active - opq->overload;
648 }
649 else
650 max = opq->max_active;
651 if (opq->active > max)
627 { 652 {
628 rval = GNUNET_NO; 653 rval = GNUNET_NO;
629 goto ret; 654 goto ret;
630 } 655 }
631 if ((opq->active + need) <= opq->max_active) 656 if ((opq->active + need) <= max)
632 goto ret; 657 goto ret;
633 deficit = need - (opq->max_active - opq->active); 658 deficit = need - (max - opq->active);
634 for (entry = opq->nq_head; 659 for (entry = opq->nq_head;
635 (0 < deficit) && (NULL != entry); 660 (0 < deficit) && (NULL != entry);
636 entry = entry->next) 661 entry = entry->next)
@@ -825,6 +850,7 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
825 n = GNUNET_MIN (n ,fctx->max_active_bound); 850 n = GNUNET_MIN (n ,fctx->max_active_bound);
826 fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot)); 851 fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
827 fctx->nfailed = 0; 852 fctx->nfailed = 0;
853 FPRINTF (stderr, "Parallelism: %u\n", n);
828 for (cnt = 0; cnt < n; cnt++) 854 for (cnt = 0; cnt < n; cnt++)
829 { 855 {
830 tslot = &fctx->tslots_freeptr[cnt]; 856 tslot = &fctx->tslots_freeptr[cnt];
@@ -881,14 +907,19 @@ adapt_parallelism (struct OperationQueue *queue)
881 adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ 907 adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
882 return; 908 return;
883 } 909 }
884 if (1 == sd)
885 adaptive_queue_set_max_active (queue, queue->max_active - 1);
886 if (2 <= sd)
887 adaptive_queue_set_max_active (queue, queue->max_active / 2);
888 if (-1 == sd) 910 if (-1 == sd)
889 adaptive_queue_set_max_active (queue, queue->max_active + 1); 911 adaptive_queue_set_max_active (queue, queue->max_active + 1);
890 if (sd <= -2) 912 if (sd <= -2)
891 adaptive_queue_set_max_active (queue, queue->max_active * 2); 913 adaptive_queue_set_max_active (queue, queue->max_active * 2);
914 if (1 == queue->max_active)
915 {
916 adaptive_queue_set_max_active (queue, 1);
917 return;
918 }
919 if (1 == sd)
920 adaptive_queue_set_max_active (queue, queue->max_active - 1);
921 if (2 <= sd)
922 adaptive_queue_set_max_active (queue, queue->max_active / 2);
892 923
893#if 0 /* old algorithm */ 924#if 0 /* old algorithm */
894 if (sd < 0) 925 if (sd < 0)
@@ -934,6 +965,7 @@ update_tslots (struct GNUNET_TESTBED_Operation *op)
934 struct GNUNET_TIME_Relative t; 965 struct GNUNET_TIME_Relative t;
935 struct TimeSlot *tslot; 966 struct TimeSlot *tslot;
936 struct FeedbackCtx *fctx; 967 struct FeedbackCtx *fctx;
968 unsigned int i;
937 969
938 t = GNUNET_TIME_absolute_get_duration (op->tstart); 970 t = GNUNET_TIME_absolute_get_duration (op->tstart);
939 while (NULL != (tslot = op->tslots_head)) /* update time slots */ 971 while (NULL != (tslot = op->tslots_head)) /* update time slots */
@@ -945,7 +977,14 @@ update_tslots (struct GNUNET_TESTBED_Operation *op)
945 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, 977 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
946 tslot); 978 tslot);
947 if (op->failed) 979 if (op->failed)
980 {
948 fctx->nfailed++; 981 fctx->nfailed++;
982 for (i = 0; i < op->nqueues; i++)
983 if (queue == op->queues[i])
984 break;
985 GNUNET_assert (i != op->nqueues);
986 op->queues[i]->overload += op->nres[i];
987 }
949 tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); 988 tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
950 if (0 != tslot->nvals++) 989 if (0 != tslot->nvals++)
951 continue; 990 continue;
@@ -1004,9 +1043,9 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
1004 { 1043 {
1005 fctx = GNUNET_new (struct FeedbackCtx); 1044 fctx = GNUNET_new (struct FeedbackCtx);
1006 fctx->max_active_bound = max_active; 1045 fctx->max_active_bound = max_active;
1007 fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */ 1046 fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
1008 queue->fctx = fctx; 1047 queue->fctx = fctx;
1009 adaptive_queue_set_max_active (queue, 4); /* start with 4 */ 1048 adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
1010 } 1049 }
1011 return queue; 1050 return queue;
1012} 1051}
@@ -1090,6 +1129,7 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
1090 struct QueueEntry *entry; 1129 struct QueueEntry *entry;
1091 1130
1092 queue->max_active = max_active; 1131 queue->max_active = max_active;
1132 queue->overload = 0;
1093 while ( (queue->active > queue->max_active) 1133 while ( (queue->active > queue->max_active)
1094 && (NULL != (entry = queue->rq_head)) ) 1134 && (NULL != (entry = queue->rq_head)) )
1095 defer (entry->op); 1135 defer (entry->op);