aboutsummaryrefslogtreecommitdiff
path: root/src/testbed/testbed_api_operations.c
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2013-08-19 14:13:19 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2013-08-19 14:13:19 +0000
commit9dd0824b40e7afeaf948a564d544b384db12cadf (patch)
tree4d98c8d27669e8eca6259a9601e32fddfa67e39c /src/testbed/testbed_api_operations.c
parentd4051630bbc5f521142f302baad60212b75f8b7f (diff)
downloadgnunet-9dd0824b40e7afeaf948a564d544b384db12cadf.tar.gz
gnunet-9dd0824b40e7afeaf948a564d544b384db12cadf.zip
fix 2893: Move adaptive parallelisation mechanism to operation queues
Diffstat (limited to 'src/testbed/testbed_api_operations.c')
-rw-r--r--src/testbed/testbed_api_operations.c358
1 files changed, 351 insertions, 7 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 521645b71..e1034a18c 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -27,6 +27,7 @@
27 27
28#include "platform.h" 28#include "platform.h"
29#include "testbed_api_operations.h" 29#include "testbed_api_operations.h"
30#include "testbed_api_sd.h"
30 31
31 32
32/** 33/**
@@ -60,6 +61,89 @@ struct QueueEntry
60 * Queue of operations where we can only support a certain 61 * Queue of operations where we can only support a certain
61 * number of concurrent operations of a particular type. 62 * number of concurrent operations of a particular type.
62 */ 63 */
64struct OperationQueue;
65
66
67/**
68 * A slot to record time taken by an operation
69 */
70struct TimeSlot
71{
72 /**
73 * DLL next pointer
74 */
75 struct TimeSlot *next;
76
77 /**
78 * DLL prev pointer
79 */
80 struct TimeSlot *prev;
81
82 /**
83 * This operation queue to which this time slot belongs to
84 */
85 struct OperationQueue *queue;
86
87 /**
88 * The operation to which this timeslot is currently allocated to
89 */
90 struct GNUNET_TESTBED_Operation *op;
91
92 /**
93 * Accumulated time
94 */
95 struct GNUNET_TIME_Relative tsum;
96
97 /**
98 * Number of timing values accumulated
99 */
100 unsigned int nvals;
101};
102
103
104/**
105 * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
106 */
107struct FeedbackCtx
108{
109 /**
110 * Handle for calculating standard deviation
111 */
112 struct SDHandle *sd;
113
114 /**
115 * Head for DLL of time slots which are free to be allocated to operations
116 */
117 struct TimeSlot *alloc_head;
118
119 /**
120 * Tail for DLL of time slots which are free to be allocated to operations
121 */
122 struct TimeSlot *alloc_tail;
123
124 /**
125 * Pointer to the chunk of time slots. Free all time slots at a time using
126 * this pointer.
127 */
128 struct TimeSlot *tslots_freeptr;
129
130 /**
131 * Number of time slots filled so far
132 */
133 unsigned int tslots_filled;
134
135 /**
136 * Bound on the maximum number of operations which can be active
137 */
138 unsigned int max_active_bound;
139
140};
141
142
143/**
144 * Queue of operations where we can only support a certain
145 * number of concurrent operations of a particular type.
146 */
63struct OperationQueue 147struct OperationQueue
64{ 148{
65 /** 149 /**
@@ -108,12 +192,26 @@ struct OperationQueue
108 struct QueueEntry *nq_tail; 192 struct QueueEntry *nq_tail;
109 193
110 /** 194 /**
195 * Feedback context; only relevant for adaptive operation queues. NULL for
196 * fixed operation queues
197 */
198 struct FeedbackCtx *fctx;
199
200 /**
201 * The type of this opeartion queue
202 */
203 enum OperationQueueType type;
204
205 /**
111 * Number of operations that are currently active in this queue. 206 * Number of operations that are currently active in this queue.
112 */ 207 */
113 unsigned int active; 208 unsigned int active;
114 209
115 /** 210 /**
116 * Max number of operations which can be active at any time in this queue 211 * Max number of operations which can be active at any time in this queue.
212 * This value can be changed either by calling
213 * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
214 * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
117 */ 215 */
118 unsigned int max_active; 216 unsigned int max_active;
119 217
@@ -222,6 +320,21 @@ struct GNUNET_TESTBED_Operation
222 struct ReadyQueueEntry *rq_entry; 320 struct ReadyQueueEntry *rq_entry;
223 321
224 /** 322 /**
323 * Head pointer for DLL of tslots allocated to this operation
324 */
325 struct TimeSlot *tslots_head;
326
327 /**
328 * Tail pointer for DLL of tslots allocated to this operation
329 */
330 struct TimeSlot *tslots_tail;
331
332 /**
333 * The time at which the operation is started
334 */
335 struct GNUNET_TIME_Absolute tstart;
336
337 /**
225 * Number of queues in the operation queues array 338 * Number of queues in the operation queues array
226 */ 339 */
227 unsigned int nqueues; 340 unsigned int nqueues;
@@ -231,6 +344,11 @@ struct GNUNET_TESTBED_Operation
231 */ 344 */
232 enum OperationState state; 345 enum OperationState state;
233 346
347 /**
348 * Is this a failed operation?
349 */
350 int failed;
351
234}; 352};
235 353
236/** 354/**
@@ -250,6 +368,29 @@ GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
250 368
251 369
252/** 370/**
371 * Assigns the given operation a time slot from the given operation queue
372 *
373 * @param op the operation
374 * @param queue the operation queue
375 * @return the timeslot
376 */
377static void
378assign_timeslot (struct GNUNET_TESTBED_Operation *op,
379 struct OperationQueue *queue)
380{
381 struct FeedbackCtx *fctx = queue->fctx;
382 struct TimeSlot *tslot;
383
384 GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
385 tslot = fctx->alloc_head;
386 GNUNET_assert (NULL != tslot);
387 GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
388 GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
389 tslot->op = op;
390}
391
392
393/**
253 * Removes a queue entry of an operation from one of the operation queues' lists 394 * Removes a queue entry of an operation from one of the operation queues' lists
254 * depending on the state of the operation 395 * depending on the state of the operation
255 * 396 *
@@ -378,6 +519,8 @@ static void
378process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 519process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
379{ 520{
380 struct GNUNET_TESTBED_Operation *op; 521 struct GNUNET_TESTBED_Operation *op;
522 struct OperationQueue *queue;
523 unsigned int cnt;
381 524
382 process_rq_task_id = GNUNET_SCHEDULER_NO_TASK; 525 process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
383 GNUNET_assert (NULL != rq_head); 526 GNUNET_assert (NULL != rq_head);
@@ -386,8 +529,15 @@ process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
386 if (NULL != rq_head) 529 if (NULL != rq_head)
387 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); 530 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
388 change_state (op, OP_STATE_ACTIVE); 531 change_state (op, OP_STATE_ACTIVE);
532 for (cnt = 0; cnt < op->nqueues; cnt++)
533 {
534 queue = op->queues[cnt];
535 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
536 assign_timeslot (op, queue);
537 }
538 op->tstart = GNUNET_TIME_absolute_get ();
389 if (NULL != op->start) 539 if (NULL != op->start)
390 op->start (op->cb_cls); 540 op->start (op->cb_cls);
391} 541}
392 542
393 543
@@ -582,7 +732,7 @@ check_readiness (struct GNUNET_TESTBED_Operation *op)
582 if (NULL != evict_ops) 732 if (NULL != evict_ops)
583 { 733 {
584 for (i = 0; i < n_evict_ops; i++) 734 for (i = 0; i < n_evict_ops; i++)
585 GNUNET_TESTBED_operation_release_ (evict_ops[i]); 735 GNUNET_TESTBED_operation_release_ (evict_ops[i]);
586 GNUNET_free (evict_ops); 736 GNUNET_free (evict_ops);
587 evict_ops = NULL; 737 evict_ops = NULL;
588 /* Evicting the operations should schedule this operation */ 738 /* Evicting the operations should schedule this operation */
@@ -619,6 +769,162 @@ defer (struct GNUNET_TESTBED_Operation *op)
619 769
620 770
621/** 771/**
772 * Cleanups the array of timeslots of an operation queue. For each time slot in
773 * the array, if it is allocated to an operation, it will be deallocated from
774 * the operation
775 *
776 * @param queue the operation queue
777 */
778static void
779cleanup_tslots (struct OperationQueue *queue)
780{
781 struct FeedbackCtx *fctx = queue->fctx;
782 struct TimeSlot *tslot;
783 struct GNUNET_TESTBED_Operation *op;
784 unsigned int cnt;
785
786 GNUNET_assert (NULL != fctx);
787 for (cnt = 0; cnt < queue->max_active; cnt++)
788 {
789 tslot = &fctx->tslots_freeptr[cnt];
790 op = tslot->op;
791 if (NULL == op)
792 continue;
793 GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
794 }
795 GNUNET_free_non_null (fctx->tslots_freeptr);
796 fctx->tslots_freeptr = NULL;
797 fctx->alloc_head = NULL;
798 fctx->alloc_tail = NULL;
799 fctx->tslots_filled = 0;
800}
801
802
803/**
804 * Initializes the operation queue for parallel overlay connects
805 *
806 * @param h the host handle
807 * @param npoc the number of parallel overlay connects - the queue size
808 */
809static void
810adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
811{
812 struct FeedbackCtx *fctx = queue->fctx;
813 struct TimeSlot *tslot;
814 unsigned int cnt;
815
816 cleanup_tslots (queue);
817 n = GNUNET_MIN (n ,fctx->max_active_bound);
818 fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
819 for (cnt = 0; cnt < n; cnt++)
820 {
821 tslot = &fctx->tslots_freeptr[cnt];
822 tslot->queue = queue;
823 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot);
824 }
825 GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
826}
827
828
829/**
830 * Adapts parallelism in an adaptive queue by using the statistical data from
831 * the feedback context.
832 *
833 * @param queue the queue
834 * @param fail GNUNET_YES if the last operation failed; GNUNET_NO if not;
835 */
836static void
837adapt_parallelism (struct OperationQueue *queue, int fail)
838{
839 struct GNUNET_TIME_Relative avg;
840 struct FeedbackCtx *fctx;
841 struct TimeSlot *tslot;
842 int sd;
843 unsigned int nvals;
844 unsigned int cnt;
845
846 avg = GNUNET_TIME_UNIT_ZERO;
847 nvals = 0;
848 fctx = queue->fctx;
849 for (cnt = 0; cnt < queue->max_active; cnt++)
850 {
851 tslot = &fctx->tslots_freeptr[cnt];
852 avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
853 nvals += tslot->nvals;
854 }
855 GNUNET_assert (nvals >= queue->max_active);
856 avg = GNUNET_TIME_relative_divide (avg, nvals);
857 sd = GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, (unsigned int)
858 avg.rel_value_us);
859 if ( (sd <= 5) ||
860 (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
861 queue->max_active)) )
862 GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
863 if (GNUNET_SYSERR == sd)
864 {
865 adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
866 return;
867 }
868 GNUNET_assert (0 <= sd);
869 if ((0 == sd) && (! fail))
870 {
871 adaptive_queue_set_max_active (queue, queue->max_active * 2);
872 return;
873 }
874 if ((1 == sd) && (! fail))
875 {
876 adaptive_queue_set_max_active (queue, queue->max_active + 1);
877 return;
878 }
879 if (1 == queue->max_active)
880 {
881 adaptive_queue_set_max_active (queue, 1);
882 return;
883 }
884 if (((sd < 2) && (fail)) || (2 == sd))
885 {
886 adaptive_queue_set_max_active (queue, queue->max_active - 1);
887 return;
888 }
889 adaptive_queue_set_max_active (queue, queue->max_active / 2);
890}
891
892
893/**
894 * update tslots with the operation's completion time. Additionally, if
895 * updating a timeslot makes all timeslots filled in an adaptive operation
896 * queue, call adapt_parallelism() for that queue.
897 *
898 * @param op the operation
899 */
900static void
901update_tslots (struct GNUNET_TESTBED_Operation *op)
902{
903 struct OperationQueue *queue;
904 struct GNUNET_TIME_Relative t;
905 struct TimeSlot *tslot;
906 struct FeedbackCtx *fctx;
907
908 t = GNUNET_TIME_absolute_get_duration (op->tstart);
909 while (NULL != (tslot = op->tslots_head)) /* update time slots */
910 {
911 queue = tslot->queue;
912 fctx = queue->fctx;
913 tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
914 GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
915 tslot->op = NULL;
916 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
917 tslot);
918 if (0 != tslot->nvals++)
919 continue;
920 fctx->tslots_filled++;
921 if (queue->max_active == fctx->tslots_filled)
922 adapt_parallelism (queue, op->failed);
923 }
924}
925
926
927/**
622 * Create an 'operation' to be performed. 928 * Create an 'operation' to be performed.
623 * 929 *
624 * @param cls closure for the callbacks 930 * @param cls closure for the callbacks
@@ -644,17 +950,32 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
644/** 950/**
645 * Create an operation queue. 951 * Create an operation queue.
646 * 952 *
953 * @param type the type of operation queue
647 * @param max_active maximum number of operations in this 954 * @param max_active maximum number of operations in this
648 * queue that can be active in parallel at the same time 955 * queue that can be active in parallel at the same time
649 * @return handle to the queue 956 * @return handle to the queue
650 */ 957 */
651struct OperationQueue * 958struct OperationQueue *
652GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active) 959GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
960 unsigned int max_active)
653{ 961{
654 struct OperationQueue *queue; 962 struct OperationQueue *queue;
963 struct FeedbackCtx *fctx;
655 964
656 queue = GNUNET_malloc (sizeof (struct OperationQueue)); 965 queue = GNUNET_malloc (sizeof (struct OperationQueue));
657 queue->max_active = max_active; 966 queue->type = type;
967 if (OPERATION_QUEUE_TYPE_FIXED == type)
968 {
969 queue->max_active = max_active;
970 }
971 else
972 {
973 fctx = GNUNET_malloc (sizeof (struct FeedbackCtx));
974 fctx->max_active_bound = max_active;
975 fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
976 queue->fctx = fctx;
977 adaptive_queue_set_max_active (queue, 1); /* start with 1 */
978 }
658 return queue; 979 return queue;
659} 980}
660 981
@@ -668,7 +989,16 @@ GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
668void 989void
669GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) 990GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
670{ 991{
992 struct FeedbackCtx *fctx;
993
671 GNUNET_break (GNUNET_YES == is_queue_empty (queue)); 994 GNUNET_break (GNUNET_YES == is_queue_empty (queue));
995 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
996 {
997 cleanup_tslots (queue);
998 fctx = queue->fctx;
999 GNUNET_TESTBED_SD_destroy_ (fctx->sd);
1000 GNUNET_free (fctx);
1001 }
672 GNUNET_free (queue); 1002 GNUNET_free (queue);
673} 1003}
674 1004
@@ -867,8 +1197,10 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
867 rq_remove (op); 1197 rq_remove (op);
868 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ 1198 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
869 GNUNET_TESTBED_operation_activate_ (op); 1199 GNUNET_TESTBED_operation_activate_ (op);
1200 if (OP_STATE_ACTIVE == op->state)
1201 update_tslots (op);
870 GNUNET_assert (NULL != op->queues); 1202 GNUNET_assert (NULL != op->queues);
871 GNUNET_assert (NULL != op->qentries); 1203 GNUNET_assert (NULL != op->qentries);
872 for (i = 0; i < op->nqueues; i++) 1204 for (i = 0; i < op->nqueues; i++)
873 { 1205 {
874 entry = op->qentries[i]; 1206 entry = op->qentries[i];
@@ -882,8 +1214,8 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
882 break; 1214 break;
883 case OP_STATE_WAITING: 1215 case OP_STATE_WAITING:
884 break; 1216 break;
885 case OP_STATE_READY:
886 case OP_STATE_ACTIVE: 1217 case OP_STATE_ACTIVE:
1218 case OP_STATE_READY:
887 GNUNET_assert (0 != opq->active); 1219 GNUNET_assert (0 != opq->active);
888 GNUNET_assert (opq->active >= entry->nres); 1220 GNUNET_assert (opq->active >= entry->nres);
889 opq->active -= entry->nres; 1221 opq->active -= entry->nres;
@@ -901,4 +1233,16 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
901} 1233}
902 1234
903 1235
1236/**
1237 * Marks an operation as failed
1238 *
1239 * @param op the operation to be marked as failed
1240 */
1241void
1242GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
1243{
1244 op->failed = GNUNET_YES;
1245}
1246
1247
904/* end of testbed_api_operations.c */ 1248/* end of testbed_api_operations.c */