diff options
Diffstat (limited to 'src/testbed/testbed_api_operations.c')
-rw-r--r-- | src/testbed/testbed_api_operations.c | 1387 |
1 files changed, 0 insertions, 1387 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c deleted file mode 100644 index a23f8c666..000000000 --- a/src/testbed/testbed_api_operations.c +++ /dev/null | |||
@@ -1,1387 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | Copyright (C) 2008--2013 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file testbed/testbed_api_operations.c | ||
23 | * @brief functions to manage operation queues | ||
24 | * @author Christian Grothoff | ||
25 | * @author Sree Harsha Totakura | ||
26 | */ | ||
27 | |||
28 | #include "platform.h" | ||
29 | #include "testbed_api_operations.h" | ||
30 | #include "testbed_api_sd.h" | ||
31 | |||
32 | /** | ||
33 | * The number of readings containing past operation's timing information that we | ||
34 | * keep track of for adaptive queues | ||
35 | */ | ||
36 | #define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40 | ||
37 | |||
38 | /** | ||
39 | * The number of parallel operations we start with by default for adaptive | ||
40 | * queues | ||
41 | */ | ||
42 | #define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4 | ||
43 | |||
44 | /** | ||
45 | * An entry in the operation queue | ||
46 | */ | ||
47 | struct QueueEntry | ||
48 | { | ||
49 | /** | ||
50 | * The next DLL pointer | ||
51 | */ | ||
52 | struct QueueEntry *next; | ||
53 | |||
54 | /** | ||
55 | * The prev DLL pointer | ||
56 | */ | ||
57 | struct QueueEntry *prev; | ||
58 | |||
59 | /** | ||
60 | * The operation this entry holds | ||
61 | */ | ||
62 | struct GNUNET_TESTBED_Operation *op; | ||
63 | |||
64 | /** | ||
65 | * How many units of resources does the operation need | ||
66 | */ | ||
67 | unsigned int nres; | ||
68 | }; | ||
69 | |||
70 | |||
71 | /** | ||
72 | * Queue of operations where we can only support a certain | ||
73 | * number of concurrent operations of a particular type. | ||
74 | */ | ||
75 | struct OperationQueue; | ||
76 | |||
77 | |||
78 | /** | ||
79 | * A slot to record time taken by an operation | ||
80 | */ | ||
81 | struct TimeSlot | ||
82 | { | ||
83 | /** | ||
84 | * DLL next pointer | ||
85 | */ | ||
86 | struct TimeSlot *next; | ||
87 | |||
88 | /** | ||
89 | * DLL prev pointer | ||
90 | */ | ||
91 | struct TimeSlot *prev; | ||
92 | |||
93 | /** | ||
94 | * This operation queue to which this time slot belongs to | ||
95 | */ | ||
96 | struct OperationQueue *queue; | ||
97 | |||
98 | /** | ||
99 | * The operation to which this timeslot is currently allocated to | ||
100 | */ | ||
101 | struct GNUNET_TESTBED_Operation *op; | ||
102 | |||
103 | /** | ||
104 | * Accumulated time | ||
105 | */ | ||
106 | struct GNUNET_TIME_Relative tsum; | ||
107 | |||
108 | /** | ||
109 | * Number of timing values accumulated | ||
110 | */ | ||
111 | unsigned int nvals; | ||
112 | }; | ||
113 | |||
114 | |||
115 | /** | ||
116 | * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE | ||
117 | */ | ||
118 | struct FeedbackCtx | ||
119 | { | ||
120 | /** | ||
121 | * Handle for calculating standard deviation | ||
122 | */ | ||
123 | struct SDHandle *sd; | ||
124 | |||
125 | /** | ||
126 | * Head for DLL of time slots which are free to be allocated to operations | ||
127 | */ | ||
128 | struct TimeSlot *alloc_head; | ||
129 | |||
130 | /** | ||
131 | * Tail for DLL of time slots which are free to be allocated to operations | ||
132 | */ | ||
133 | struct TimeSlot *alloc_tail; | ||
134 | |||
135 | /** | ||
136 | * Pointer to the chunk of time slots. Free all time slots at a time using | ||
137 | * this pointer. | ||
138 | */ | ||
139 | struct TimeSlot *tslots_freeptr; | ||
140 | |||
141 | /** | ||
142 | * Number of time slots filled so far | ||
143 | */ | ||
144 | unsigned int tslots_filled; | ||
145 | |||
146 | /** | ||
147 | * Bound on the maximum number of operations which can be active | ||
148 | */ | ||
149 | unsigned int max_active_bound; | ||
150 | |||
151 | /** | ||
152 | * Number of operations that have failed | ||
153 | */ | ||
154 | unsigned int nfailed; | ||
155 | }; | ||
156 | |||
157 | |||
158 | /** | ||
159 | * Queue of operations where we can only support a certain | ||
160 | * number of concurrent operations of a particular type. | ||
161 | */ | ||
162 | struct OperationQueue | ||
163 | { | ||
164 | /** | ||
165 | * DLL head for the wait queue. Operations which are waiting for this | ||
166 | * operation queue are put here | ||
167 | */ | ||
168 | struct QueueEntry *wq_head; | ||
169 | |||
170 | /** | ||
171 | * DLL tail for the wait queue. | ||
172 | */ | ||
173 | struct QueueEntry *wq_tail; | ||
174 | |||
175 | /** | ||
176 | * DLL head for the ready queue. Operations which are in this operation queue | ||
177 | * and are in ready state are put here | ||
178 | */ | ||
179 | struct QueueEntry *rq_head; | ||
180 | |||
181 | /** | ||
182 | * DLL tail for the ready queue | ||
183 | */ | ||
184 | struct QueueEntry *rq_tail; | ||
185 | |||
186 | /** | ||
187 | * DLL head for the active queue. Operations which are in this operation | ||
188 | * queue and are currently active are put here | ||
189 | */ | ||
190 | struct QueueEntry *aq_head; | ||
191 | |||
192 | /** | ||
193 | * DLL tail for the active queue. | ||
194 | */ | ||
195 | struct QueueEntry *aq_tail; | ||
196 | |||
197 | /** | ||
198 | * DLL head for the inactive queue. Operations which are inactive and can be | ||
199 | * evicted if the queues it holds are maxed out and another operation begins | ||
200 | * to wait on them. | ||
201 | */ | ||
202 | struct QueueEntry *nq_head; | ||
203 | |||
204 | /** | ||
205 | * DLL tail for the inactive queue. | ||
206 | */ | ||
207 | struct QueueEntry *nq_tail; | ||
208 | |||
209 | /** | ||
210 | * Feedback context; only relevant for adaptive operation queues. NULL for | ||
211 | * fixed operation queues | ||
212 | */ | ||
213 | struct FeedbackCtx *fctx; | ||
214 | |||
215 | /** | ||
216 | * The type of this operation queue | ||
217 | */ | ||
218 | enum OperationQueueType type; | ||
219 | |||
220 | /** | ||
221 | * Number of operations that are currently active in this queue. | ||
222 | */ | ||
223 | unsigned int active; | ||
224 | |||
225 | /** | ||
226 | * Max number of operations which can be active at any time in this queue. | ||
227 | * This value can be changed either by calling | ||
228 | * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive | ||
229 | * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE | ||
230 | */ | ||
231 | unsigned int max_active; | ||
232 | |||
233 | /** | ||
234 | * The number of resources occupied by failed operations in the current shot. | ||
235 | * This is only relevant if the operation queue is of type | ||
236 | * #OPERATION_QUEUE_TYPE_ADAPTIVE | ||
237 | */ | ||
238 | unsigned int overload; | ||
239 | |||
240 | /** | ||
241 | * Is this queue marked for expiry? | ||
242 | */ | ||
243 | unsigned int expired; | ||
244 | }; | ||
245 | |||
246 | |||
247 | /** | ||
248 | * Operation state | ||
249 | */ | ||
250 | enum OperationState | ||
251 | { | ||
252 | /** | ||
253 | * The operation is just created and is in initial state | ||
254 | */ | ||
255 | OP_STATE_INIT, | ||
256 | |||
257 | /** | ||
258 | * The operation is currently waiting for resources | ||
259 | */ | ||
260 | OP_STATE_WAITING, | ||
261 | |||
262 | /** | ||
263 | * The operation is ready to be started | ||
264 | */ | ||
265 | OP_STATE_READY, | ||
266 | |||
267 | /** | ||
268 | * The operation has started and is active | ||
269 | */ | ||
270 | OP_STATE_ACTIVE, | ||
271 | |||
272 | /** | ||
273 | * The operation is inactive. It still holds resources on the operation | ||
274 | * queues. However, this operation will be evicted when another operation | ||
275 | * requires resources from the maxed out queues this operation is holding | ||
276 | * resources from. | ||
277 | */ | ||
278 | OP_STATE_INACTIVE | ||
279 | }; | ||
280 | |||
281 | |||
282 | /** | ||
283 | * An entry in the ready queue (implemented as DLL) | ||
284 | */ | ||
285 | struct ReadyQueueEntry | ||
286 | { | ||
287 | /** | ||
288 | * next ptr for DLL | ||
289 | */ | ||
290 | struct ReadyQueueEntry *next; | ||
291 | |||
292 | /** | ||
293 | * prev ptr for DLL | ||
294 | */ | ||
295 | struct ReadyQueueEntry *prev; | ||
296 | |||
297 | /** | ||
298 | * The operation associated with this entry | ||
299 | */ | ||
300 | struct GNUNET_TESTBED_Operation *op; | ||
301 | }; | ||
302 | |||
303 | |||
304 | /** | ||
305 | * Opaque handle to an abstract operation to be executed by the testing framework. | ||
306 | */ | ||
307 | struct GNUNET_TESTBED_Operation | ||
308 | { | ||
309 | /** | ||
310 | * Function to call when we have the resources to begin the operation. | ||
311 | */ | ||
312 | OperationStart start; | ||
313 | |||
314 | /** | ||
315 | * Function to call to clean up after the operation (which may or may | ||
316 | * not have been started yet). | ||
317 | */ | ||
318 | OperationRelease release; | ||
319 | |||
320 | /** | ||
321 | * Closure for callbacks. | ||
322 | */ | ||
323 | void *cb_cls; | ||
324 | |||
325 | /** | ||
326 | * Array of operation queues this Operation belongs to. | ||
327 | */ | ||
328 | struct OperationQueue **queues; | ||
329 | |||
330 | /** | ||
331 | * Array of operation queue entries corresponding to this operation in | ||
332 | * operation queues for this operation | ||
333 | */ | ||
334 | struct QueueEntry **qentries; | ||
335 | |||
336 | /** | ||
337 | * Array of number of resources an operation need from each queue. The numbers | ||
338 | * in this array should correspond to the queues array | ||
339 | */ | ||
340 | unsigned int *nres; | ||
341 | |||
342 | /** | ||
343 | * Entry corresponding to this operation in ready queue. Will be NULL if the | ||
344 | * operation is not marked as READY | ||
345 | */ | ||
346 | struct ReadyQueueEntry *rq_entry; | ||
347 | |||
348 | /** | ||
349 | * Head pointer for DLL of tslots allocated to this operation | ||
350 | */ | ||
351 | struct TimeSlot *tslots_head; | ||
352 | |||
353 | /** | ||
354 | * Tail pointer for DLL of tslots allocated to this operation | ||
355 | */ | ||
356 | struct TimeSlot *tslots_tail; | ||
357 | |||
358 | /** | ||
359 | * The time at which the operation is started | ||
360 | */ | ||
361 | struct GNUNET_TIME_Absolute tstart; | ||
362 | |||
363 | /** | ||
364 | * Number of queues in the operation queues array | ||
365 | */ | ||
366 | unsigned int nqueues; | ||
367 | |||
368 | /** | ||
369 | * The state of the operation | ||
370 | */ | ||
371 | enum OperationState state; | ||
372 | |||
373 | /** | ||
374 | * Is this a failed operation? | ||
375 | */ | ||
376 | int failed; | ||
377 | }; | ||
378 | |||
379 | /** | ||
380 | * DLL head for the ready queue | ||
381 | */ | ||
382 | static struct ReadyQueueEntry *rq_head; | ||
383 | |||
384 | /** | ||
385 | * DLL tail for the ready queue | ||
386 | */ | ||
387 | static struct ReadyQueueEntry *rq_tail; | ||
388 | |||
389 | /** | ||
390 | * Array of operation queues which are to be destroyed | ||
391 | */ | ||
392 | static struct OperationQueue **expired_opqs; | ||
393 | |||
394 | /** | ||
395 | * Number of expired operation queues in the above array | ||
396 | */ | ||
397 | static unsigned int n_expired_opqs; | ||
398 | |||
399 | /** | ||
400 | * The id of the task to process the ready queue | ||
401 | */ | ||
402 | struct GNUNET_SCHEDULER_Task *process_rq_task_id; | ||
403 | |||
404 | |||
405 | /** | ||
406 | * Assigns the given operation a time slot from the given operation queue | ||
407 | * | ||
408 | * @param op the operation | ||
409 | * @param queue the operation queue | ||
410 | * @return the timeslot | ||
411 | */ | ||
412 | static void | ||
413 | assign_timeslot (struct GNUNET_TESTBED_Operation *op, | ||
414 | struct OperationQueue *queue) | ||
415 | { | ||
416 | struct FeedbackCtx *fctx = queue->fctx; | ||
417 | struct TimeSlot *tslot; | ||
418 | |||
419 | GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type); | ||
420 | tslot = fctx->alloc_head; | ||
421 | GNUNET_assert (NULL != tslot); | ||
422 | GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot); | ||
423 | GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot); | ||
424 | tslot->op = op; | ||
425 | } | ||
426 | |||
427 | |||
428 | /** | ||
429 | * Removes a queue entry of an operation from one of the operation queues' lists | ||
430 | * depending on the state of the operation | ||
431 | * | ||
432 | * @param op the operation whose entry has to be removed | ||
433 | * @param index the index of the entry in the operation's array of queue entries | ||
434 | */ | ||
435 | static void | ||
436 | remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) | ||
437 | { | ||
438 | struct OperationQueue *opq; | ||
439 | struct QueueEntry *entry; | ||
440 | |||
441 | opq = op->queues[index]; | ||
442 | entry = op->qentries[index]; | ||
443 | switch (op->state) | ||
444 | { | ||
445 | case OP_STATE_INIT: | ||
446 | GNUNET_assert (0); | ||
447 | break; | ||
448 | |||
449 | case OP_STATE_WAITING: | ||
450 | GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); | ||
451 | break; | ||
452 | |||
453 | case OP_STATE_READY: | ||
454 | GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry); | ||
455 | break; | ||
456 | |||
457 | case OP_STATE_ACTIVE: | ||
458 | GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); | ||
459 | break; | ||
460 | |||
461 | case OP_STATE_INACTIVE: | ||
462 | GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry); | ||
463 | break; | ||
464 | } | ||
465 | } | ||
466 | |||
467 | |||
468 | /** | ||
469 | * Changes the state of the operation while moving its associated queue entries | ||
470 | * in the operation's operation queues | ||
471 | * | ||
472 | * @param op the operation whose state has to be changed | ||
473 | * @param state the state the operation should have. It cannot be OP_STATE_INIT | ||
474 | */ | ||
475 | static void | ||
476 | change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) | ||
477 | { | ||
478 | struct QueueEntry *entry; | ||
479 | struct OperationQueue *opq; | ||
480 | unsigned int cnt; | ||
481 | unsigned int s; | ||
482 | |||
483 | GNUNET_assert (OP_STATE_INIT != state); | ||
484 | GNUNET_assert (NULL != op->queues); | ||
485 | GNUNET_assert (NULL != op->nres); | ||
486 | GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries)); | ||
487 | GNUNET_assert (op->state != state); | ||
488 | for (cnt = 0; cnt < op->nqueues; cnt++) | ||
489 | { | ||
490 | if (OP_STATE_INIT == op->state) | ||
491 | { | ||
492 | entry = GNUNET_new (struct QueueEntry); | ||
493 | entry->op = op; | ||
494 | entry->nres = op->nres[cnt]; | ||
495 | s = cnt; | ||
496 | GNUNET_array_append (op->qentries, s, entry); | ||
497 | } | ||
498 | else | ||
499 | { | ||
500 | entry = op->qentries[cnt]; | ||
501 | remove_queue_entry (op, cnt); | ||
502 | } | ||
503 | opq = op->queues[cnt]; | ||
504 | switch (state) | ||
505 | { | ||
506 | case OP_STATE_INIT: | ||
507 | GNUNET_assert (0); | ||
508 | break; | ||
509 | |||
510 | case OP_STATE_WAITING: | ||
511 | GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); | ||
512 | break; | ||
513 | |||
514 | case OP_STATE_READY: | ||
515 | GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry); | ||
516 | break; | ||
517 | |||
518 | case OP_STATE_ACTIVE: | ||
519 | GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); | ||
520 | break; | ||
521 | |||
522 | case OP_STATE_INACTIVE: | ||
523 | GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry); | ||
524 | break; | ||
525 | } | ||
526 | } | ||
527 | op->state = state; | ||
528 | } | ||
529 | |||
530 | |||
531 | /** | ||
532 | * Removes an operation from the ready queue. Also stops the 'process_rq_task' | ||
533 | * if the given operation is the last one in the queue. | ||
534 | * | ||
535 | * @param op the operation to be removed | ||
536 | */ | ||
537 | static void | ||
538 | rq_remove (struct GNUNET_TESTBED_Operation *op) | ||
539 | { | ||
540 | GNUNET_assert (NULL != op->rq_entry); | ||
541 | GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry); | ||
542 | GNUNET_free (op->rq_entry); | ||
543 | op->rq_entry = NULL; | ||
544 | if ((NULL == rq_head) && (NULL != process_rq_task_id)) | ||
545 | { | ||
546 | GNUNET_SCHEDULER_cancel (process_rq_task_id); | ||
547 | process_rq_task_id = NULL; | ||
548 | } | ||
549 | } | ||
550 | |||
551 | |||
552 | /** | ||
553 | * Processes the ready queue by calling the operation start callback of the | ||
554 | * operation at the head. The operation is then removed from the queue. The | ||
555 | * task is scheduled to run again immediately until no more operations are in | ||
556 | * the ready queue. | ||
557 | * | ||
558 | * @param cls NULL | ||
559 | */ | ||
560 | static void | ||
561 | process_rq_task (void *cls) | ||
562 | { | ||
563 | struct GNUNET_TESTBED_Operation *op; | ||
564 | struct OperationQueue *queue; | ||
565 | unsigned int cnt; | ||
566 | |||
567 | process_rq_task_id = NULL; | ||
568 | GNUNET_assert (NULL != rq_head); | ||
569 | GNUNET_assert (NULL != (op = rq_head->op)); | ||
570 | rq_remove (op); | ||
571 | if (NULL != rq_head) | ||
572 | process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); | ||
573 | change_state (op, OP_STATE_ACTIVE); | ||
574 | for (cnt = 0; cnt < op->nqueues; cnt++) | ||
575 | { | ||
576 | queue = op->queues[cnt]; | ||
577 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) | ||
578 | assign_timeslot (op, queue); | ||
579 | } | ||
580 | op->tstart = GNUNET_TIME_absolute_get (); | ||
581 | if (NULL != op->start) | ||
582 | op->start (op->cb_cls); | ||
583 | } | ||
584 | |||
585 | |||
586 | /** | ||
587 | * Adds the operation to the ready queue and starts the 'process_rq_task' | ||
588 | * | ||
589 | * @param op the operation to be queued | ||
590 | */ | ||
591 | static void | ||
592 | rq_add (struct GNUNET_TESTBED_Operation *op) | ||
593 | { | ||
594 | struct ReadyQueueEntry *rq_entry; | ||
595 | |||
596 | GNUNET_assert (NULL == op->rq_entry); | ||
597 | rq_entry = GNUNET_new (struct ReadyQueueEntry); | ||
598 | rq_entry->op = op; | ||
599 | GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry); | ||
600 | op->rq_entry = rq_entry; | ||
601 | if (NULL == process_rq_task_id) | ||
602 | process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); | ||
603 | } | ||
604 | |||
605 | |||
606 | /** | ||
607 | * Checks if the given operation queue is empty or not | ||
608 | * | ||
609 | * @param opq the operation queue | ||
610 | * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO | ||
611 | * otherwise | ||
612 | */ | ||
613 | static int | ||
614 | is_queue_empty (struct OperationQueue *opq) | ||
615 | { | ||
616 | if ((NULL != opq->wq_head) | ||
617 | || (NULL != opq->rq_head) | ||
618 | || (NULL != opq->aq_head) | ||
619 | || (NULL != opq->nq_head)) | ||
620 | return GNUNET_NO; | ||
621 | return GNUNET_YES; | ||
622 | } | ||
623 | |||
624 | |||
625 | /** | ||
626 | * Checks if the given operation queue has enough resources to provide for the | ||
627 | * operation of the given queue entry. It also checks if any inactive | ||
628 | * operations are to be released in order to accommodate the needed resources | ||
629 | * and returns them as an array. | ||
630 | * | ||
631 | * @param opq the operation queue to check for resource accommodation | ||
632 | * @param entry the operation queue entry whose operation's resources are to be | ||
633 | * accommodated | ||
634 | * @param ops_ pointer to return the array of operations which are to be released | ||
635 | * in order to accommodate the new operation. Can be NULL | ||
636 | * @param n_ops_ the number of operations in ops_ | ||
637 | * @return GNUNET_YES if the given entry's operation can be accommodated in this | ||
638 | * queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will | ||
639 | * be set to NULL and 0 respectively. | ||
640 | */ | ||
641 | static int | ||
642 | decide_capacity (struct OperationQueue *opq, | ||
643 | struct QueueEntry *entry, | ||
644 | struct GNUNET_TESTBED_Operation ***ops_, | ||
645 | unsigned int *n_ops_) | ||
646 | { | ||
647 | struct QueueEntry **evict_entries; | ||
648 | struct GNUNET_TESTBED_Operation **ops; | ||
649 | struct GNUNET_TESTBED_Operation *op; | ||
650 | unsigned int n_ops; | ||
651 | unsigned int n_evict_entries; | ||
652 | unsigned int need; | ||
653 | unsigned int max; | ||
654 | int deficit; | ||
655 | int rval; | ||
656 | |||
657 | GNUNET_assert (NULL != (op = entry->op)); | ||
658 | GNUNET_assert (0 < (need = entry->nres)); | ||
659 | ops = NULL; | ||
660 | n_ops = 0; | ||
661 | evict_entries = NULL; | ||
662 | n_evict_entries = 0; | ||
663 | rval = GNUNET_YES; | ||
664 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) | ||
665 | { | ||
666 | GNUNET_assert (NULL != opq->fctx); | ||
667 | GNUNET_assert (opq->max_active >= opq->overload); | ||
668 | max = opq->max_active - opq->overload; | ||
669 | } | ||
670 | else | ||
671 | max = opq->max_active; | ||
672 | if (opq->active > max) | ||
673 | { | ||
674 | rval = GNUNET_NO; | ||
675 | goto ret; | ||
676 | } | ||
677 | if ((opq->active + need) <= max) | ||
678 | goto ret; | ||
679 | deficit = need - (max - opq->active); | ||
680 | for (entry = opq->nq_head; | ||
681 | (0 < deficit) && (NULL != entry); | ||
682 | entry = entry->next) | ||
683 | { | ||
684 | GNUNET_array_append (evict_entries, n_evict_entries, entry); | ||
685 | deficit -= entry->nres; | ||
686 | } | ||
687 | if (0 < deficit) | ||
688 | { | ||
689 | rval = GNUNET_NO; | ||
690 | goto ret; | ||
691 | } | ||
692 | for (n_ops = 0; n_ops < n_evict_entries;) | ||
693 | { | ||
694 | op = evict_entries[n_ops]->op; | ||
695 | GNUNET_array_append (ops, n_ops, op); /* increments n-ops */ | ||
696 | } | ||
697 | |||
698 | ret: | ||
699 | GNUNET_free (evict_entries); | ||
700 | if (NULL != ops_) | ||
701 | *ops_ = ops; | ||
702 | else | ||
703 | GNUNET_free (ops); | ||
704 | if (NULL != n_ops_) | ||
705 | *n_ops_ = n_ops; | ||
706 | return rval; | ||
707 | } | ||
708 | |||
709 | |||
710 | /** | ||
711 | * Merges an array of operations into another, eliminating duplicates. No | ||
712 | * ordering is guaranteed. | ||
713 | * | ||
714 | * @param old the array into which the merging is done. | ||
715 | * @param n_old the number of operations in old array | ||
716 | * @param new the array from which operations are to be merged | ||
717 | * @param n_new the number of operations in new array | ||
718 | */ | ||
719 | static void | ||
720 | merge_ops (struct GNUNET_TESTBED_Operation ***old, | ||
721 | unsigned int *n_old, | ||
722 | struct GNUNET_TESTBED_Operation **new, | ||
723 | unsigned int n_new) | ||
724 | { | ||
725 | struct GNUNET_TESTBED_Operation **cur; | ||
726 | unsigned int i; | ||
727 | unsigned int j; | ||
728 | unsigned int n_cur; | ||
729 | |||
730 | GNUNET_assert (NULL != old); | ||
731 | n_cur = *n_old; | ||
732 | cur = *old; | ||
733 | for (i = 0; i < n_new; i++) | ||
734 | { | ||
735 | for (j = 0; j < *n_old; j++) | ||
736 | { | ||
737 | if (new[i] == cur[j]) | ||
738 | break; | ||
739 | } | ||
740 | if (j < *n_old) | ||
741 | continue; | ||
742 | GNUNET_array_append (cur, n_cur, new[j]); | ||
743 | } | ||
744 | *old = cur; | ||
745 | *n_old = n_cur; | ||
746 | } | ||
747 | |||
748 | |||
749 | /** | ||
750 | * Checks for the readiness of an operation and schedules a operation start task | ||
751 | * | ||
752 | * @param op the operation | ||
753 | */ | ||
754 | static int | ||
755 | check_readiness (struct GNUNET_TESTBED_Operation *op) | ||
756 | { | ||
757 | struct GNUNET_TESTBED_Operation **evict_ops; | ||
758 | struct GNUNET_TESTBED_Operation **ops; | ||
759 | unsigned int n_ops; | ||
760 | unsigned int n_evict_ops; | ||
761 | unsigned int i; | ||
762 | |||
763 | GNUNET_assert (NULL == op->rq_entry); | ||
764 | GNUNET_assert (OP_STATE_WAITING == op->state); | ||
765 | evict_ops = NULL; | ||
766 | n_evict_ops = 0; | ||
767 | for (i = 0; i < op->nqueues; i++) | ||
768 | { | ||
769 | ops = NULL; | ||
770 | n_ops = 0; | ||
771 | if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i], | ||
772 | &ops, &n_ops)) | ||
773 | { | ||
774 | GNUNET_free (evict_ops); | ||
775 | return GNUNET_NO; | ||
776 | } | ||
777 | if (NULL == ops) | ||
778 | continue; | ||
779 | merge_ops (&evict_ops, &n_evict_ops, ops, n_ops); | ||
780 | GNUNET_free (ops); | ||
781 | } | ||
782 | if (NULL != evict_ops) | ||
783 | { | ||
784 | for (i = 0; i < n_evict_ops; i++) | ||
785 | GNUNET_TESTBED_operation_release_ (evict_ops[i]); | ||
786 | GNUNET_free (evict_ops); | ||
787 | evict_ops = NULL; | ||
788 | /* Evicting the operations should schedule this operation */ | ||
789 | GNUNET_assert (OP_STATE_READY == op->state); | ||
790 | return GNUNET_YES; | ||
791 | } | ||
792 | for (i = 0; i < op->nqueues; i++) | ||
793 | op->queues[i]->active += op->nres[i]; | ||
794 | change_state (op, OP_STATE_READY); | ||
795 | rq_add (op); | ||
796 | return GNUNET_YES; | ||
797 | } | ||
798 | |||
799 | |||
800 | /** | ||
801 | * Defers a ready to be executed operation back to waiting | ||
802 | * | ||
803 | * @param op the operation to defer | ||
804 | */ | ||
805 | static void | ||
806 | defer (struct GNUNET_TESTBED_Operation *op) | ||
807 | { | ||
808 | unsigned int i; | ||
809 | |||
810 | GNUNET_assert (OP_STATE_READY == op->state); | ||
811 | rq_remove (op); | ||
812 | for (i = 0; i < op->nqueues; i++) | ||
813 | { | ||
814 | GNUNET_assert (op->queues[i]->active >= op->nres[i]); | ||
815 | op->queues[i]->active -= op->nres[i]; | ||
816 | } | ||
817 | change_state (op, OP_STATE_WAITING); | ||
818 | } | ||
819 | |||
820 | |||
821 | /** | ||
822 | * Cleanups the array of timeslots of an operation queue. For each time slot in | ||
823 | * the array, if it is allocated to an operation, it will be deallocated from | ||
824 | * the operation | ||
825 | * | ||
826 | * @param queue the operation queue | ||
827 | */ | ||
828 | static void | ||
829 | cleanup_tslots (struct OperationQueue *queue) | ||
830 | { | ||
831 | struct FeedbackCtx *fctx = queue->fctx; | ||
832 | struct TimeSlot *tslot; | ||
833 | struct GNUNET_TESTBED_Operation *op; | ||
834 | unsigned int cnt; | ||
835 | |||
836 | GNUNET_assert (NULL != fctx); | ||
837 | for (cnt = 0; cnt < queue->max_active; cnt++) | ||
838 | { | ||
839 | tslot = &fctx->tslots_freeptr[cnt]; | ||
840 | op = tslot->op; | ||
841 | if (NULL == op) | ||
842 | continue; | ||
843 | GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); | ||
844 | } | ||
845 | GNUNET_free (fctx->tslots_freeptr); | ||
846 | fctx->tslots_freeptr = NULL; | ||
847 | fctx->alloc_head = NULL; | ||
848 | fctx->alloc_tail = NULL; | ||
849 | fctx->tslots_filled = 0; | ||
850 | } | ||
851 | |||
852 | |||
853 | /** | ||
854 | * Cleansup the existing timing slots and sets new timing slots in the given | ||
855 | * queue to accommodate given number of max active operations. | ||
856 | * | ||
857 | * @param queue the queue | ||
858 | * @param n the number of maximum active operations. If n is greater than the | ||
859 | * maximum limit set while creating the queue, then the minimum of these two | ||
860 | * will be selected as n | ||
861 | */ | ||
862 | static void | ||
863 | adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n) | ||
864 | { | ||
865 | struct FeedbackCtx *fctx = queue->fctx; | ||
866 | struct TimeSlot *tslot; | ||
867 | unsigned int cnt; | ||
868 | |||
869 | cleanup_tslots (queue); | ||
870 | n = GNUNET_MIN (n, fctx->max_active_bound); | ||
871 | fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot)); | ||
872 | fctx->nfailed = 0; | ||
873 | for (cnt = 0; cnt < n; cnt++) | ||
874 | { | ||
875 | tslot = &fctx->tslots_freeptr[cnt]; | ||
876 | tslot->queue = queue; | ||
877 | GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, | ||
878 | tslot); | ||
879 | } | ||
880 | GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n); | ||
881 | } | ||
882 | |||
883 | |||
884 | /** | ||
885 | * Adapts parallelism in an adaptive queue by using the statistical data from | ||
886 | * the feedback context. | ||
887 | * | ||
888 | * @param queue the queue | ||
889 | */ | ||
890 | static void | ||
891 | adapt_parallelism (struct OperationQueue *queue) | ||
892 | { | ||
893 | struct GNUNET_TIME_Relative avg; | ||
894 | struct FeedbackCtx *fctx; | ||
895 | struct TimeSlot *tslot; | ||
896 | int sd; | ||
897 | unsigned int nvals; | ||
898 | unsigned int cnt; | ||
899 | unsigned int parallelism; | ||
900 | |||
901 | avg = GNUNET_TIME_UNIT_ZERO; | ||
902 | nvals = 0; | ||
903 | fctx = queue->fctx; | ||
904 | for (cnt = 0; cnt < queue->max_active; cnt++) | ||
905 | { | ||
906 | tslot = &fctx->tslots_freeptr[cnt]; | ||
907 | avg = GNUNET_TIME_relative_add (avg, tslot->tsum); | ||
908 | nvals += tslot->nvals; | ||
909 | } | ||
910 | GNUNET_assert (nvals >= queue->max_active); | ||
911 | GNUNET_assert (fctx->nfailed <= nvals); | ||
912 | nvals -= fctx->nfailed; | ||
913 | if (0 == nvals) | ||
914 | { | ||
915 | if (1 == queue->max_active) | ||
916 | adaptive_queue_set_max_active (queue, 1); | ||
917 | else | ||
918 | adaptive_queue_set_max_active (queue, queue->max_active / 2); | ||
919 | return; | ||
920 | } | ||
921 | avg = GNUNET_TIME_relative_divide (avg, nvals); | ||
922 | GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); | ||
923 | if (GNUNET_SYSERR == | ||
924 | GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, | ||
925 | (unsigned int) avg.rel_value_us, | ||
926 | &sd)) | ||
927 | { | ||
928 | adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ | ||
929 | return; | ||
930 | } | ||
931 | |||
932 | parallelism = 0; | ||
933 | if (-1 == sd) | ||
934 | parallelism = queue->max_active + 1; | ||
935 | if (sd <= -2) | ||
936 | parallelism = queue->max_active * 2; | ||
937 | if (1 == sd) | ||
938 | parallelism = queue->max_active - 1; | ||
939 | if (2 <= sd) | ||
940 | parallelism = queue->max_active / 2; | ||
941 | parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); | ||
942 | adaptive_queue_set_max_active (queue, parallelism); | ||
943 | |||
944 | #if 0 | ||
945 | /* old algorithm */ | ||
946 | if (sd < 0) | ||
947 | sd = 0; | ||
948 | GNUNET_assert (0 <= sd); | ||
949 | // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); | ||
950 | if (0 == sd) | ||
951 | { | ||
952 | adaptive_queue_set_max_active (queue, queue->max_active * 2); | ||
953 | return; | ||
954 | } | ||
955 | if (1 == sd) | ||
956 | { | ||
957 | adaptive_queue_set_max_active (queue, queue->max_active + 1); | ||
958 | return; | ||
959 | } | ||
960 | if (1 == queue->max_active) | ||
961 | { | ||
962 | adaptive_queue_set_max_active (queue, 1); | ||
963 | return; | ||
964 | } | ||
965 | if (2 == sd) | ||
966 | { | ||
967 | adaptive_queue_set_max_active (queue, queue->max_active - 1); | ||
968 | return; | ||
969 | } | ||
970 | adaptive_queue_set_max_active (queue, queue->max_active / 2); | ||
971 | #endif | ||
972 | } | ||
973 | |||
974 | |||
975 | /** | ||
976 | * update tslots with the operation's completion time. Additionally, if | ||
977 | * updating a timeslot makes all timeslots filled in an adaptive operation | ||
978 | * queue, call adapt_parallelism() for that queue. | ||
979 | * | ||
980 | * @param op the operation | ||
981 | */ | ||
982 | static void | ||
983 | update_tslots (struct GNUNET_TESTBED_Operation *op) | ||
984 | { | ||
985 | struct OperationQueue *queue; | ||
986 | struct GNUNET_TIME_Relative t; | ||
987 | struct TimeSlot *tslot; | ||
988 | struct FeedbackCtx *fctx; | ||
989 | unsigned int i; | ||
990 | |||
991 | t = GNUNET_TIME_absolute_get_duration (op->tstart); | ||
992 | while (NULL != (tslot = op->tslots_head)) /* update time slots */ | ||
993 | { | ||
994 | queue = tslot->queue; | ||
995 | fctx = queue->fctx; | ||
996 | GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); | ||
997 | tslot->op = NULL; | ||
998 | GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, | ||
999 | tslot); | ||
1000 | if (op->failed) | ||
1001 | { | ||
1002 | fctx->nfailed++; | ||
1003 | for (i = 0; i < op->nqueues; i++) | ||
1004 | if (queue == op->queues[i]) | ||
1005 | break; | ||
1006 | GNUNET_assert (i != op->nqueues); | ||
1007 | op->queues[i]->overload += op->nres[i]; | ||
1008 | } | ||
1009 | tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); | ||
1010 | if (0 != tslot->nvals++) | ||
1011 | continue; | ||
1012 | fctx->tslots_filled++; | ||
1013 | if (queue->max_active == fctx->tslots_filled) | ||
1014 | adapt_parallelism (queue); | ||
1015 | } | ||
1016 | } | ||
1017 | |||
1018 | |||
1019 | /** | ||
1020 | * Create an 'operation' to be performed. | ||
1021 | * | ||
1022 | * @param cls closure for the callbacks | ||
1023 | * @param start function to call to start the operation | ||
1024 | * @param release function to call to close down the operation | ||
1025 | * @return handle to the operation | ||
1026 | */ | ||
1027 | struct GNUNET_TESTBED_Operation * | ||
1028 | GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start, | ||
1029 | OperationRelease release) | ||
1030 | { | ||
1031 | struct GNUNET_TESTBED_Operation *op; | ||
1032 | |||
1033 | op = GNUNET_new (struct GNUNET_TESTBED_Operation); | ||
1034 | op->start = start; | ||
1035 | op->state = OP_STATE_INIT; | ||
1036 | op->release = release; | ||
1037 | op->cb_cls = cls; | ||
1038 | return op; | ||
1039 | } | ||
1040 | |||
1041 | |||
1042 | /** | ||
1043 | * Create an operation queue. | ||
1044 | * | ||
1045 | * @param type the type of operation queue | ||
1046 | * @param max_active maximum number of operations in this | ||
1047 | * queue that can be active in parallel at the same time | ||
1048 | * @return handle to the queue | ||
1049 | */ | ||
1050 | struct OperationQueue * | ||
1051 | GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, | ||
1052 | unsigned int max_active) | ||
1053 | { | ||
1054 | struct OperationQueue *queue; | ||
1055 | struct FeedbackCtx *fctx; | ||
1056 | |||
1057 | queue = GNUNET_new (struct OperationQueue); | ||
1058 | queue->type = type; | ||
1059 | if (OPERATION_QUEUE_TYPE_FIXED == type) | ||
1060 | { | ||
1061 | queue->max_active = max_active; | ||
1062 | } | ||
1063 | else | ||
1064 | { | ||
1065 | fctx = GNUNET_new (struct FeedbackCtx); | ||
1066 | fctx->max_active_bound = max_active; | ||
1067 | fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY); | ||
1068 | queue->fctx = fctx; | ||
1069 | adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); | ||
1070 | } | ||
1071 | return queue; | ||
1072 | } | ||
1073 | |||
1074 | |||
1075 | /** | ||
1076 | * Cleanup the given operation queue. | ||
1077 | * | ||
1078 | * @param queue the operation queue to destroy | ||
1079 | */ | ||
1080 | static void | ||
1081 | queue_destroy (struct OperationQueue *queue) | ||
1082 | { | ||
1083 | struct FeedbackCtx *fctx; | ||
1084 | |||
1085 | if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) | ||
1086 | { | ||
1087 | cleanup_tslots (queue); | ||
1088 | fctx = queue->fctx; | ||
1089 | GNUNET_TESTBED_SD_destroy_ (fctx->sd); | ||
1090 | GNUNET_free (fctx); | ||
1091 | } | ||
1092 | GNUNET_free (queue); | ||
1093 | } | ||
1094 | |||
1095 | |||
1096 | /** | ||
1097 | * Destroys an operation queue. If the queue is still in use by operations it | ||
1098 | * is marked as expired and its resources are released in the destructor | ||
1099 | * GNUNET_TESTBED_operations_fini(). | ||
1100 | * | ||
1101 | * @param queue queue to destroy | ||
1102 | */ | ||
1103 | void | ||
1104 | GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) | ||
1105 | { | ||
1106 | if (GNUNET_YES != is_queue_empty (queue)) | ||
1107 | { | ||
1108 | GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */ | ||
1109 | queue->expired = 1; | ||
1110 | GNUNET_array_append (expired_opqs, n_expired_opqs, queue); | ||
1111 | return; | ||
1112 | } | ||
1113 | queue_destroy (queue); | ||
1114 | } | ||
1115 | |||
1116 | |||
1117 | /** | ||
1118 | * Destroys the operation queue if it is empty. If not empty return GNUNET_NO. | ||
1119 | * | ||
1120 | * @param queue the queue to destroy if empty | ||
1121 | * @return GNUNET_YES if the queue is destroyed. GNUNET_NO if not (because it | ||
1122 | * is not empty) | ||
1123 | */ | ||
1124 | int | ||
1125 | GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) | ||
1126 | { | ||
1127 | if (GNUNET_NO == is_queue_empty (queue)) | ||
1128 | return GNUNET_NO; | ||
1129 | GNUNET_TESTBED_operation_queue_destroy_ (queue); | ||
1130 | return GNUNET_YES; | ||
1131 | } | ||
1132 | |||
1133 | |||
1134 | /** | ||
1135 | * Rechecks if any of the operations in the given operation queue's waiting list | ||
1136 | * can be made active | ||
1137 | * | ||
1138 | * @param opq the operation queue | ||
1139 | */ | ||
1140 | static void | ||
1141 | recheck_waiting (struct OperationQueue *opq) | ||
1142 | { | ||
1143 | struct QueueEntry *entry; | ||
1144 | struct QueueEntry *entry2; | ||
1145 | |||
1146 | entry = opq->wq_head; | ||
1147 | while (NULL != entry) | ||
1148 | { | ||
1149 | entry2 = entry->next; | ||
1150 | if (GNUNET_NO == check_readiness (entry->op)) | ||
1151 | break; | ||
1152 | entry = entry2; | ||
1153 | } | ||
1154 | } | ||
1155 | |||
1156 | |||
1157 | /** | ||
1158 | * Function to reset the maximum number of operations in the given queue. If | ||
1159 | * max_active is lesser than the number of currently active operations, the | ||
1160 | * active operations are not stopped immediately. | ||
1161 | * | ||
1162 | * @param queue the operation queue which has to be modified | ||
1163 | * @param max_active the new maximum number of active operations | ||
1164 | */ | ||
1165 | void | ||
1166 | GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, | ||
1167 | unsigned int max_active) | ||
1168 | { | ||
1169 | struct QueueEntry *entry; | ||
1170 | |||
1171 | queue->max_active = max_active; | ||
1172 | queue->overload = 0; | ||
1173 | while ((queue->active > queue->max_active) | ||
1174 | && (NULL != (entry = queue->rq_head))) | ||
1175 | defer (entry->op); | ||
1176 | recheck_waiting (queue); | ||
1177 | } | ||
1178 | |||
1179 | |||
1180 | /** | ||
1181 | * Add an operation to a queue. An operation can be in multiple queues at | ||
1182 | * once. Once the operation is inserted into all the queues | ||
1183 | * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start | ||
1184 | * waiting for the operation to become active. | ||
1185 | * | ||
1186 | * @param queue queue to add the operation to | ||
1187 | * @param op operation to add to the queue | ||
1188 | * @param nres the number of units of the resources of queue needed by the | ||
1189 | * operation. Should be greater than 0. | ||
1190 | */ | ||
1191 | void | ||
1192 | GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, | ||
1193 | struct GNUNET_TESTBED_Operation *op, | ||
1194 | unsigned int nres) | ||
1195 | { | ||
1196 | unsigned int qsize; | ||
1197 | |||
1198 | GNUNET_assert (0 < nres); | ||
1199 | qsize = op->nqueues; | ||
1200 | GNUNET_array_append (op->queues, op->nqueues, queue); | ||
1201 | GNUNET_array_append (op->nres, qsize, nres); | ||
1202 | GNUNET_assert (qsize == op->nqueues); | ||
1203 | } | ||
1204 | |||
1205 | |||
1206 | /** | ||
1207 | * Add an operation to a queue. An operation can be in multiple queues at | ||
1208 | * once. Once the operation is inserted into all the queues | ||
1209 | * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start | ||
1210 | * waiting for the operation to become active. The operation is assumed to take | ||
1211 | * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it | ||
1212 | * requires more than 1 | ||
1213 | * | ||
1214 | * @param queue queue to add the operation to | ||
1215 | * @param op operation to add to the queue | ||
1216 | */ | ||
1217 | void | ||
1218 | GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, | ||
1219 | struct GNUNET_TESTBED_Operation *op) | ||
1220 | { | ||
1221 | return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1); | ||
1222 | } | ||
1223 | |||
1224 | |||
1225 | /** | ||
1226 | * Marks the given operation as waiting on the queues. Once all queues permit | ||
1227 | * the operation to become active, the operation will be activated. The actual | ||
1228 | * activation will occur in a separate task (thus allowing multiple queue | ||
1229 | * insertions to be made without having the first one instantly trigger the | ||
1230 | * operation if the first queue has sufficient resources). | ||
1231 | * | ||
1232 | * @param op the operation to marks as waiting | ||
1233 | */ | ||
1234 | void | ||
1235 | GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) | ||
1236 | { | ||
1237 | GNUNET_assert (NULL == op->rq_entry); | ||
1238 | change_state (op, OP_STATE_WAITING); | ||
1239 | (void) check_readiness (op); | ||
1240 | } | ||
1241 | |||
1242 | |||
1243 | /** | ||
1244 | * Marks an active operation as inactive - the operation will be kept in a | ||
1245 | * ready-to-be-released state and continues to hold resources until another | ||
1246 | * operation contents for them. | ||
1247 | * | ||
1248 | * @param op the operation to be marked as inactive. The operation start | ||
1249 | * callback should have been called before for this operation to mark | ||
1250 | * it as inactive. | ||
1251 | */ | ||
1252 | void | ||
1253 | GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) | ||
1254 | { | ||
1255 | struct OperationQueue **queues; | ||
1256 | size_t ms; | ||
1257 | unsigned int nqueues; | ||
1258 | unsigned int i; | ||
1259 | |||
1260 | GNUNET_assert (OP_STATE_ACTIVE == op->state); | ||
1261 | change_state (op, OP_STATE_INACTIVE); | ||
1262 | nqueues = op->nqueues; | ||
1263 | ms = sizeof(struct OperationQueue *) * nqueues; | ||
1264 | queues = GNUNET_malloc (ms); | ||
1265 | /* Cloning is needed as the operation be released by waiting operations and | ||
1266 | hence its nqueues memory ptr will be freed */ | ||
1267 | GNUNET_memcpy (queues, op->queues, ms); | ||
1268 | for (i = 0; i < nqueues; i++) | ||
1269 | recheck_waiting (queues[i]); | ||
1270 | GNUNET_free (queues); | ||
1271 | } | ||
1272 | |||
1273 | |||
1274 | /** | ||
1275 | * Marks and inactive operation as active. This function should be called to | ||
1276 | * ensure that the oprelease callback will not be called until it is either | ||
1277 | * marked as inactive or released. | ||
1278 | * | ||
1279 | * @param op the operation to be marked as active | ||
1280 | */ | ||
1281 | void | ||
1282 | GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) | ||
1283 | { | ||
1284 | GNUNET_assert (OP_STATE_INACTIVE == op->state); | ||
1285 | change_state (op, OP_STATE_ACTIVE); | ||
1286 | } | ||
1287 | |||
1288 | |||
1289 | /** | ||
1290 | * An operation is 'done' (was cancelled or finished); remove | ||
1291 | * it from the queues and release associated resources. | ||
1292 | * | ||
1293 | * @param op operation that finished | ||
1294 | */ | ||
1295 | void | ||
1296 | GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) | ||
1297 | { | ||
1298 | struct QueueEntry *entry; | ||
1299 | struct OperationQueue *opq; | ||
1300 | unsigned int i; | ||
1301 | |||
1302 | if (OP_STATE_INIT == op->state) | ||
1303 | { | ||
1304 | GNUNET_free (op); | ||
1305 | return; | ||
1306 | } | ||
1307 | if (OP_STATE_READY == op->state) | ||
1308 | rq_remove (op); | ||
1309 | if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ | ||
1310 | GNUNET_TESTBED_operation_activate_ (op); | ||
1311 | if (OP_STATE_ACTIVE == op->state) | ||
1312 | update_tslots (op); | ||
1313 | GNUNET_assert (NULL != op->queues); | ||
1314 | GNUNET_assert (NULL != op->qentries); | ||
1315 | for (i = 0; i < op->nqueues; i++) | ||
1316 | { | ||
1317 | entry = op->qentries[i]; | ||
1318 | remove_queue_entry (op, i); | ||
1319 | opq = op->queues[i]; | ||
1320 | switch (op->state) | ||
1321 | { | ||
1322 | case OP_STATE_INIT: | ||
1323 | case OP_STATE_INACTIVE: | ||
1324 | GNUNET_assert (0); | ||
1325 | break; | ||
1326 | |||
1327 | case OP_STATE_WAITING: | ||
1328 | break; | ||
1329 | |||
1330 | case OP_STATE_ACTIVE: | ||
1331 | case OP_STATE_READY: | ||
1332 | GNUNET_assert (0 != opq->active); | ||
1333 | GNUNET_assert (opq->active >= entry->nres); | ||
1334 | opq->active -= entry->nres; | ||
1335 | recheck_waiting (opq); | ||
1336 | break; | ||
1337 | } | ||
1338 | GNUNET_free (entry); | ||
1339 | } | ||
1340 | GNUNET_free (op->qentries); | ||
1341 | GNUNET_free (op->queues); | ||
1342 | GNUNET_free (op->nres); | ||
1343 | if (NULL != op->release) | ||
1344 | op->release (op->cb_cls); | ||
1345 | GNUNET_free (op); | ||
1346 | } | ||
1347 | |||
1348 | |||
1349 | /** | ||
1350 | * Marks an operation as failed | ||
1351 | * | ||
1352 | * @param op the operation to be marked as failed | ||
1353 | */ | ||
1354 | void | ||
1355 | GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op) | ||
1356 | { | ||
1357 | op->failed = GNUNET_YES; | ||
1358 | } | ||
1359 | |||
1360 | |||
1361 | /** | ||
1362 | * Cleanup expired operation queues. While doing so, also check for any | ||
1363 | * operations which are not completed and warn about them. | ||
1364 | */ | ||
1365 | void __attribute__ ((destructor)) | ||
1366 | GNUNET_TESTBED_operations_fini () | ||
1367 | { | ||
1368 | struct OperationQueue *queue; | ||
1369 | unsigned int i; | ||
1370 | int warn = 0; | ||
1371 | |||
1372 | for (i = 0; i < n_expired_opqs; i++) | ||
1373 | { | ||
1374 | queue = expired_opqs[i]; | ||
1375 | if (GNUNET_NO == is_queue_empty (queue)) | ||
1376 | warn = 1; | ||
1377 | queue_destroy (queue); | ||
1378 | } | ||
1379 | GNUNET_free (expired_opqs); | ||
1380 | n_expired_opqs = 0; | ||
1381 | if (warn) | ||
1382 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1383 | "Be disciplined. Some operations were not marked as done.\n"); | ||
1384 | } | ||
1385 | |||
1386 | |||
1387 | /* end of testbed_api_operations.c */ | ||