aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2013-08-19 14:13:19 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2013-08-19 14:13:19 +0000
commit9dd0824b40e7afeaf948a564d544b384db12cadf (patch)
tree4d98c8d27669e8eca6259a9601e32fddfa67e39c
parentd4051630bbc5f521142f302baad60212b75f8b7f (diff)
downloadgnunet-9dd0824b40e7afeaf948a564d544b384db12cadf.tar.gz
gnunet-9dd0824b40e7afeaf948a564d544b384db12cadf.zip
fix 2893: Move adaptive parallelisation mechanism to operation queues
-rw-r--r--src/testbed/gnunet-service-testbed.c3
-rw-r--r--src/testbed/test_testbed_api_operations.c4
-rw-r--r--src/testbed/testbed_api.c36
-rw-r--r--src/testbed/testbed_api_hosts.c238
-rw-r--r--src/testbed/testbed_api_hosts.h55
-rw-r--r--src/testbed/testbed_api_operations.c358
-rw-r--r--src/testbed/testbed_api_operations.h36
-rw-r--r--src/testbed/testbed_api_peers.c9
-rw-r--r--src/testbed/testbed_api_peers.h15
-rw-r--r--src/testbed/testbed_api_statistics.c4
10 files changed, 418 insertions, 340 deletions
diff --git a/src/testbed/gnunet-service-testbed.c b/src/testbed/gnunet-service-testbed.c
index 3710b82e1..f49e0331f 100644
--- a/src/testbed/gnunet-service-testbed.c
+++ b/src/testbed/gnunet-service-testbed.c
@@ -918,7 +918,8 @@ testbed_run (void *cls, struct GNUNET_SERVER_Handle *server,
918 GNUNET_assert (GNUNET_OK == 918 GNUNET_assert (GNUNET_OK ==
919 GNUNET_CONFIGURATION_get_value_number (cfg, "TESTBED", 919 GNUNET_CONFIGURATION_get_value_number (cfg, "TESTBED",
920 "MAX_OPEN_FDS", &num)); 920 "MAX_OPEN_FDS", &num));
921 GST_opq_openfds = GNUNET_TESTBED_operation_queue_create_ ((unsigned int) num); 921 GST_opq_openfds = GNUNET_TESTBED_operation_queue_create_
922 (OPERATION_QUEUE_TYPE_FIXED, (unsigned int) num);
922 GNUNET_assert (GNUNET_OK == 923 GNUNET_assert (GNUNET_OK ==
923 GNUNET_CONFIGURATION_get_value_time (cfg, "TESTBED", 924 GNUNET_CONFIGURATION_get_value_time (cfg, "TESTBED",
924 "OPERATION_TIMEOUT", 925 "OPERATION_TIMEOUT",
diff --git a/src/testbed/test_testbed_api_operations.c b/src/testbed/test_testbed_api_operations.c
index 31b3add0a..feca147b7 100644
--- a/src/testbed/test_testbed_api_operations.c
+++ b/src/testbed/test_testbed_api_operations.c
@@ -490,9 +490,9 @@ static void
490run (void *cls, char *const *args, const char *cfgfile, 490run (void *cls, char *const *args, const char *cfgfile,
491 const struct GNUNET_CONFIGURATION_Handle *config) 491 const struct GNUNET_CONFIGURATION_Handle *config)
492{ 492{
493 q1 = GNUNET_TESTBED_operation_queue_create_ (1); 493 q1 = GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, 1);
494 GNUNET_assert (NULL != q1); 494 GNUNET_assert (NULL != q1);
495 q2 = GNUNET_TESTBED_operation_queue_create_ (2); 495 q2 = GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, 2);
496 GNUNET_assert (NULL != q2); 496 GNUNET_assert (NULL != q2);
497 op1 = GNUNET_TESTBED_operation_create_ (&op1, start_cb, release_cb); 497 op1 = GNUNET_TESTBED_operation_create_ (&op1, start_cb, release_cb);
498 GNUNET_assert (NULL != op1); 498 GNUNET_assert (NULL != op1);
diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c
index 010d51776..48b7fe189 100644
--- a/src/testbed/testbed_api.c
+++ b/src/testbed/testbed_api.c
@@ -860,7 +860,7 @@ handle_op_fail_event (struct GNUNET_TESTBED_Controller *c,
860 struct OverlayConnectData *data; 860 struct OverlayConnectData *data;
861 861
862 data = opc->data; 862 data = opc->data;
863 data->failed = GNUNET_YES; 863 GNUNET_TESTBED_operation_mark_failed (opc->op);
864 if (NULL != data->cb) 864 if (NULL != data->cb)
865 data->cb (data->cb_cls, opc->op, emsg); 865 data->cb (data->cb_cls, opc->op, emsg);
866 } 866 }
@@ -1486,13 +1486,15 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host,
1486 GNUNET_TESTBED_mark_host_registered_at_ (host, controller); 1486 GNUNET_TESTBED_mark_host_registered_at_ (host, controller);
1487 controller->host = host; 1487 controller->host = host;
1488 controller->opq_parallel_operations = 1488 controller->opq_parallel_operations =
1489 GNUNET_TESTBED_operation_queue_create_ ((unsigned int) 1489 GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
1490 max_parallel_operations); 1490 (unsigned int) max_parallel_operations);
1491 controller->opq_parallel_service_connections = 1491 controller->opq_parallel_service_connections =
1492 GNUNET_TESTBED_operation_queue_create_ ((unsigned int) 1492 GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
1493 (unsigned int)
1493 max_parallel_service_connections); 1494 max_parallel_service_connections);
1494 controller->opq_parallel_topology_config_operations = 1495 controller->opq_parallel_topology_config_operations =
1495 GNUNET_TESTBED_operation_queue_create_ ((unsigned int) 1496 GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
1497 (unsigned int)
1496 max_parallel_topology_config_operations); 1498 max_parallel_topology_config_operations);
1497 controller_hostname = GNUNET_TESTBED_host_get_hostname (host); 1499 controller_hostname = GNUNET_TESTBED_host_get_hostname (host);
1498 if (NULL == controller_hostname) 1500 if (NULL == controller_hostname)
@@ -1856,13 +1858,25 @@ GNUNET_TESTBED_create_helper_init_msg_ (const char *trusted_ip,
1856 1858
1857 1859
1858/** 1860/**
1859 * Signal that the information from an operation has been fully 1861 * This function is used to signal that the event information (struct
1860 * processed. This function MUST be called for each event 1862 * GNUNET_TESTBED_EventInformation) from an operation has been fully processed
1861 * of type 'operation_finished' to fully remove the operation 1863 * i.e. if the event callback is ever called for this operation. If the event
1862 * from the operation queue. After calling this function, the 1864 * callback for this operation has not yet been called, calling this function
1863 * 'op_result' becomes invalid (!). 1865 * cancels the operation, frees its resources and ensures the no event is
1866 * generated with respect to this operation. Note that however cancelling an
1867 * operation does NOT guarantee that the operation will be fully undone (or that
1868 * nothing ever happened).
1864 * 1869 *
1865 * @param operation operation to signal completion for 1870 * This function MUST be called for every operation to fully remove the
1871 * operation from the operation queue. After calling this function, if
1872 * operation is completed and its event information is of type
1873 * GNUNET_TESTBED_ET_OPERATION_FINISHED, the 'op_result' becomes invalid (!).
1874
1875 * If the operation is generated from GNUNET_TESTBED_service_connect() then
1876 * calling this function on such as operation calls the disconnect adapter if
1877 * the connect adapter was ever called.
1878 *
1879 * @param operation operation to signal completion or cancellation
1866 */ 1880 */
1867void 1881void
1868GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation) 1882GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation)
diff --git a/src/testbed/testbed_api_hosts.c b/src/testbed/testbed_api_hosts.c
index cd805dca1..b7c824bc2 100644
--- a/src/testbed/testbed_api_hosts.c
+++ b/src/testbed/testbed_api_hosts.c
@@ -35,7 +35,6 @@
35#include "testbed_api_hosts.h" 35#include "testbed_api_hosts.h"
36#include "testbed_helper.h" 36#include "testbed_helper.h"
37#include "testbed_api_operations.h" 37#include "testbed_api_operations.h"
38#include "testbed_api_sd.h"
39 38
40#include <zlib.h> 39#include <zlib.h>
41 40
@@ -97,28 +96,6 @@ struct RegisteredController
97 96
98 97
99/** 98/**
100 * A slot to record time taken by an overlay connect operation
101 */
102struct TimeSlot
103{
104 /**
105 * A key to identify this timeslot
106 */
107 void *key;
108
109 /**
110 * Time
111 */
112 struct GNUNET_TIME_Relative time;
113
114 /**
115 * Number of timing values accumulated
116 */
117 unsigned int nvals;
118};
119
120
121/**
122 * Opaque handle to a host running experiments managed by the testing framework. 99 * Opaque handle to a host running experiments managed by the testing framework.
123 * The master process must be able to SSH to this host without password (via 100 * The master process must be able to SSH to this host without password (via
124 * ssh-agent). 101 * ssh-agent).
@@ -161,28 +138,6 @@ struct GNUNET_TESTBED_Host
161 struct OperationQueue *opq_parallel_overlay_connect_operations; 138 struct OperationQueue *opq_parallel_overlay_connect_operations;
162 139
163 /** 140 /**
164 * An array of timing slots; size should be equal to the current number of parallel
165 * overlay connects
166 */
167 struct TimeSlot *tslots;
168
169 /**
170 * Handle for SD calculations amount parallel overlay connect operation finish
171 * times
172 */
173 struct SDHandle *poc_sd;
174
175 /**
176 * The number of parallel overlay connects we do currently
177 */
178 unsigned int num_parallel_connects;
179
180 /**
181 * Counter to indicate when all the available time slots are filled
182 */
183 unsigned int tslots_filled;
184
185 /**
186 * Is a controller started on this host? FIXME: Is this needed? 141 * Is a controller started on this host? FIXME: Is this needed?
187 */ 142 */
188 int controller_started; 143 int controller_started;
@@ -382,9 +337,8 @@ GNUNET_TESTBED_host_create_with_id (uint32_t id, const char *hostname,
382 host->port = (0 == port) ? 22 : port; 337 host->port = (0 == port) ? 22 : port;
383 host->cfg = GNUNET_CONFIGURATION_dup (cfg); 338 host->cfg = GNUNET_CONFIGURATION_dup (cfg);
384 host->opq_parallel_overlay_connect_operations = 339 host->opq_parallel_overlay_connect_operations =
385 GNUNET_TESTBED_operation_queue_create_ (0); 340 GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_ADAPTIVE,
386 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (host, 1); 341 UINT_MAX);
387 host->poc_sd = GNUNET_TESTBED_SD_init_ (10);
388 new_size = host_list_size; 342 new_size = host_list_size;
389 while (id >= new_size) 343 while (id >= new_size)
390 new_size += HOST_LIST_GROW_STEP; 344 new_size += HOST_LIST_GROW_STEP;
@@ -740,8 +694,6 @@ GNUNET_TESTBED_host_destroy (struct GNUNET_TESTBED_Host *host)
740 GNUNET_free_non_null ((char *) host->hostname); 694 GNUNET_free_non_null ((char *) host->hostname);
741 GNUNET_TESTBED_operation_queue_destroy_ 695 GNUNET_TESTBED_operation_queue_destroy_
742 (host->opq_parallel_overlay_connect_operations); 696 (host->opq_parallel_overlay_connect_operations);
743 GNUNET_TESTBED_SD_destroy_ (host->poc_sd);
744 GNUNET_free_non_null (host->tslots);
745 GNUNET_CONFIGURATION_destroy (host->cfg); 697 GNUNET_CONFIGURATION_destroy (host->cfg);
746 GNUNET_free (host); 698 GNUNET_free (host);
747 while (host_list_size >= HOST_LIST_GROW_STEP) 699 while (host_list_size >= HOST_LIST_GROW_STEP)
@@ -1624,192 +1576,6 @@ GNUNET_TESTBED_cancel_registration (struct GNUNET_TESTBED_HostRegistrationHandle
1624 1576
1625 1577
1626/** 1578/**
1627 * Initializes the operation queue for parallel overlay connects
1628 *
1629 * @param h the host handle
1630 * @param npoc the number of parallel overlay connects - the queue size
1631 */
1632void
1633GNUNET_TESTBED_set_num_parallel_overlay_connects_ (struct
1634 GNUNET_TESTBED_Host *h,
1635 unsigned int npoc)
1636{
1637 //fprintf (stderr, "%d", npoc);
1638 GNUNET_free_non_null (h->tslots);
1639 h->tslots_filled = 0;
1640 h->num_parallel_connects = npoc;
1641 h->tslots = GNUNET_malloc (npoc * sizeof (struct TimeSlot));
1642 GNUNET_TESTBED_operation_queue_reset_max_active_
1643 (h->opq_parallel_overlay_connect_operations, npoc);
1644}
1645
1646
1647/**
1648 * Returns a timing slot which will be exclusively locked
1649 *
1650 * @param h the host handle
1651 * @param key a pointer which is associated to the returned slot; should not be
1652 * NULL. It serves as a key to determine the correct owner of the slot
1653 * @return the time slot index in the array of time slots in the controller
1654 * handle
1655 */
1656unsigned int
1657GNUNET_TESTBED_get_tslot_ (struct GNUNET_TESTBED_Host *h, void *key)
1658{
1659 unsigned int slot;
1660
1661 GNUNET_assert (NULL != h->tslots);
1662 GNUNET_assert (NULL != key);
1663 for (slot = 0; slot < h->num_parallel_connects; slot++)
1664 if (NULL == h->tslots[slot].key)
1665 {
1666 h->tslots[slot].key = key;
1667 return slot;
1668 }
1669 GNUNET_assert (0); /* We should always find a free tslot */
1670}
1671
1672
1673/**
1674 * Decides whether any change in the number of parallel overlay connects is
1675 * necessary to adapt to the load on the system
1676 *
1677 * @param h the host handle
1678 */
1679static void
1680decide_npoc (struct GNUNET_TESTBED_Host *h)
1681{
1682 struct GNUNET_TIME_Relative avg;
1683 int sd;
1684 unsigned int slot;
1685 unsigned int nvals;
1686
1687 if (h->tslots_filled != h->num_parallel_connects)
1688 return;
1689 avg = GNUNET_TIME_UNIT_ZERO;
1690 nvals = 0;
1691 for (slot = 0; slot < h->num_parallel_connects; slot++)
1692 {
1693 avg = GNUNET_TIME_relative_add (avg, h->tslots[slot].time);
1694 nvals += h->tslots[slot].nvals;
1695 }
1696 GNUNET_assert (nvals >= h->num_parallel_connects);
1697 avg = GNUNET_TIME_relative_divide (avg, nvals);
1698 GNUNET_assert (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != avg.rel_value_us);
1699 sd = GNUNET_TESTBED_SD_deviation_factor_ (h->poc_sd, (unsigned int) avg.rel_value_us);
1700 if ( (sd <= 5) ||
1701 (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1702 h->num_parallel_connects)) )
1703 GNUNET_TESTBED_SD_add_data_ (h->poc_sd, (unsigned int) avg.rel_value_us);
1704 if (GNUNET_SYSERR == sd)
1705 {
1706 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
1707 h->num_parallel_connects);
1708 return;
1709 }
1710 GNUNET_assert (0 <= sd);
1711 if (0 == sd)
1712 {
1713 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
1714 h->num_parallel_connects
1715 * 2);
1716 return;
1717 }
1718 if (1 == sd)
1719 {
1720 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
1721 h->num_parallel_connects
1722 + 1);
1723 return;
1724 }
1725 if (1 == h->num_parallel_connects)
1726 {
1727 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h, 1);
1728 return;
1729 }
1730 if (2 == sd)
1731 {
1732 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
1733 h->num_parallel_connects
1734 - 1);
1735 return;
1736 }
1737 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
1738 h->num_parallel_connects /
1739 2);
1740}
1741
1742
1743/**
1744 * Releases a time slot thus making it available for be used again
1745 *
1746 * @param h the host handle
1747 * @param index the index of the the time slot
1748 * @param key the key to prove ownership of the timeslot
1749 * @return GNUNET_YES if the time slot is successfully removed; GNUNET_NO if the
1750 * time slot cannot be removed - this could be because of the index
1751 * greater than existing number of time slots or `key' being different
1752 */
1753int
1754GNUNET_TESTBED_release_time_slot_ (struct GNUNET_TESTBED_Host *h,
1755 unsigned int index, void *key)
1756{
1757 struct TimeSlot *slot;
1758
1759 GNUNET_assert (NULL != key);
1760 if (index >= h->num_parallel_connects)
1761 return GNUNET_NO;
1762 slot = &h->tslots[index];
1763 if (key != slot->key)
1764 return GNUNET_NO;
1765 slot->key = NULL;
1766 return GNUNET_YES;
1767}
1768
1769
1770/**
1771 * Function to update a time slot
1772 *
1773 * @param h the host handle
1774 * @param index the index of the time slot to update
1775 * @param key the key to identify ownership of the slot
1776 * @param time the new time
1777 * @param failed should this reading be treated as coming from a fail event
1778 */
1779void
1780GNUNET_TESTBED_update_time_slot_ (struct GNUNET_TESTBED_Host *h,
1781 unsigned int index, void *key,
1782 struct GNUNET_TIME_Relative time, int failed)
1783{
1784 struct TimeSlot *slot;
1785
1786 if (GNUNET_YES == failed)
1787 {
1788 if (1 == h->num_parallel_connects)
1789 {
1790 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h, 1);
1791 return;
1792 }
1793 GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
1794 h->num_parallel_connects
1795 - 1);
1796 }
1797 if (GNUNET_NO == GNUNET_TESTBED_release_time_slot_ (h, index, key))
1798 return;
1799 slot = &h->tslots[index];
1800 slot->nvals++;
1801 if (GNUNET_TIME_UNIT_ZERO.rel_value_us == slot->time.rel_value_us)
1802 {
1803 slot->time = time;
1804 h->tslots_filled++;
1805 decide_npoc (h);
1806 return;
1807 }
1808 slot->time = GNUNET_TIME_relative_add (slot->time, time);
1809}
1810
1811
1812/**
1813 * Queues the given operation in the queue for parallel overlay connects of the 1579 * Queues the given operation in the queue for parallel overlay connects of the
1814 * given host 1580 * given host
1815 * 1581 *
diff --git a/src/testbed/testbed_api_hosts.h b/src/testbed/testbed_api_hosts.h
index 6c305c340..a1cbb5b88 100644
--- a/src/testbed/testbed_api_hosts.h
+++ b/src/testbed/testbed_api_hosts.h
@@ -151,61 +151,6 @@ GNUNET_TESTBED_is_host_registered_ (const struct GNUNET_TESTBED_Host *host,
151 151
152 152
153/** 153/**
154 * (re)sets the operation queue for parallel overlay connects
155 *
156 * @param h the host handle
157 * @param npoc the number of parallel overlay connects - the queue size
158 */
159void
160GNUNET_TESTBED_set_num_parallel_overlay_connects_ (struct
161 GNUNET_TESTBED_Host *h,
162 unsigned int npoc);
163
164
165/**
166 * Releases a time slot thus making it available for be used again
167 *
168 * @param h the host handle
169 * @param index the index of the the time slot
170 * @param key the key to prove ownership of the timeslot
171 * @return GNUNET_YES if the time slot is successfully removed; GNUNET_NO if the
172 * time slot cannot be removed - this could be because of the index
173 * greater than existing number of time slots or `key' being different
174 */
175int
176GNUNET_TESTBED_release_time_slot_ (struct GNUNET_TESTBED_Host *h,
177 unsigned int index, void *key);
178
179
180/**
181 * Function to update a time slot
182 *
183 * @param h the host handle
184 * @param index the index of the time slot to update
185 * @param key the key to identify ownership of the slot
186 * @param time the new time
187 * @param failed should this reading be treated as coming from a fail event
188 */
189void
190GNUNET_TESTBED_update_time_slot_ (struct GNUNET_TESTBED_Host *h,
191 unsigned int index, void *key,
192 struct GNUNET_TIME_Relative time, int failed);
193
194
195/**
196 * Returns a timing slot which will be exclusively locked
197 *
198 * @param h the host handle
199 * @param key a pointer which is associated to the returned slot; should not be
200 * NULL. It serves as a key to determine the correct owner of the slot
201 * @return the time slot index in the array of time slots in the controller
202 * handle
203 */
204unsigned int
205GNUNET_TESTBED_get_tslot_ (struct GNUNET_TESTBED_Host *h, void *key);
206
207
208/**
209 * Queues the given operation in the queue for parallel overlay connects of the 154 * Queues the given operation in the queue for parallel overlay connects of the
210 * given host 155 * given host
211 * 156 *
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 521645b71..e1034a18c 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -27,6 +27,7 @@
27 27
28#include "platform.h" 28#include "platform.h"
29#include "testbed_api_operations.h" 29#include "testbed_api_operations.h"
30#include "testbed_api_sd.h"
30 31
31 32
32/** 33/**
@@ -60,6 +61,89 @@ struct QueueEntry
60 * Queue of operations where we can only support a certain 61 * Queue of operations where we can only support a certain
61 * number of concurrent operations of a particular type. 62 * number of concurrent operations of a particular type.
62 */ 63 */
64struct OperationQueue;
65
66
67/**
68 * A slot to record time taken by an operation
69 */
70struct TimeSlot
71{
72 /**
73 * DLL next pointer
74 */
75 struct TimeSlot *next;
76
77 /**
78 * DLL prev pointer
79 */
80 struct TimeSlot *prev;
81
82 /**
83 * This operation queue to which this time slot belongs to
84 */
85 struct OperationQueue *queue;
86
87 /**
88 * The operation to which this timeslot is currently allocated to
89 */
90 struct GNUNET_TESTBED_Operation *op;
91
92 /**
93 * Accumulated time
94 */
95 struct GNUNET_TIME_Relative tsum;
96
97 /**
98 * Number of timing values accumulated
99 */
100 unsigned int nvals;
101};
102
103
104/**
105 * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
106 */
107struct FeedbackCtx
108{
109 /**
110 * Handle for calculating standard deviation
111 */
112 struct SDHandle *sd;
113
114 /**
115 * Head for DLL of time slots which are free to be allocated to operations
116 */
117 struct TimeSlot *alloc_head;
118
119 /**
120 * Tail for DLL of time slots which are free to be allocated to operations
121 */
122 struct TimeSlot *alloc_tail;
123
124 /**
125 * Pointer to the chunk of time slots. Free all time slots at a time using
126 * this pointer.
127 */
128 struct TimeSlot *tslots_freeptr;
129
130 /**
131 * Number of time slots filled so far
132 */
133 unsigned int tslots_filled;
134
135 /**
136 * Bound on the maximum number of operations which can be active
137 */
138 unsigned int max_active_bound;
139
140};
141
142
143/**
144 * Queue of operations where we can only support a certain
145 * number of concurrent operations of a particular type.
146 */
63struct OperationQueue 147struct OperationQueue
64{ 148{
65 /** 149 /**
@@ -108,12 +192,26 @@ struct OperationQueue
108 struct QueueEntry *nq_tail; 192 struct QueueEntry *nq_tail;
109 193
110 /** 194 /**
195 * Feedback context; only relevant for adaptive operation queues. NULL for
196 * fixed operation queues
197 */
198 struct FeedbackCtx *fctx;
199
200 /**
201 * The type of this opeartion queue
202 */
203 enum OperationQueueType type;
204
205 /**
111 * Number of operations that are currently active in this queue. 206 * Number of operations that are currently active in this queue.
112 */ 207 */
113 unsigned int active; 208 unsigned int active;
114 209
115 /** 210 /**
116 * Max number of operations which can be active at any time in this queue 211 * Max number of operations which can be active at any time in this queue.
212 * This value can be changed either by calling
213 * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
214 * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
117 */ 215 */
118 unsigned int max_active; 216 unsigned int max_active;
119 217
@@ -222,6 +320,21 @@ struct GNUNET_TESTBED_Operation
222 struct ReadyQueueEntry *rq_entry; 320 struct ReadyQueueEntry *rq_entry;
223 321
224 /** 322 /**
323 * Head pointer for DLL of tslots allocated to this operation
324 */
325 struct TimeSlot *tslots_head;
326
327 /**
328 * Tail pointer for DLL of tslots allocated to this operation
329 */
330 struct TimeSlot *tslots_tail;
331
332 /**
333 * The time at which the operation is started
334 */
335 struct GNUNET_TIME_Absolute tstart;
336
337 /**
225 * Number of queues in the operation queues array 338 * Number of queues in the operation queues array
226 */ 339 */
227 unsigned int nqueues; 340 unsigned int nqueues;
@@ -231,6 +344,11 @@ struct GNUNET_TESTBED_Operation
231 */ 344 */
232 enum OperationState state; 345 enum OperationState state;
233 346
347 /**
348 * Is this a failed operation?
349 */
350 int failed;
351
234}; 352};
235 353
236/** 354/**
@@ -250,6 +368,29 @@ GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
250 368
251 369
252/** 370/**
371 * Assigns the given operation a time slot from the given operation queue
372 *
373 * @param op the operation
374 * @param queue the operation queue
375 * @return the timeslot
376 */
377static void
378assign_timeslot (struct GNUNET_TESTBED_Operation *op,
379 struct OperationQueue *queue)
380{
381 struct FeedbackCtx *fctx = queue->fctx;
382 struct TimeSlot *tslot;
383
384 GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
385 tslot = fctx->alloc_head;
386 GNUNET_assert (NULL != tslot);
387 GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
388 GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
389 tslot->op = op;
390}
391
392
393/**
253 * Removes a queue entry of an operation from one of the operation queues' lists 394 * Removes a queue entry of an operation from one of the operation queues' lists
254 * depending on the state of the operation 395 * depending on the state of the operation
255 * 396 *
@@ -378,6 +519,8 @@ static void
378process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 519process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
379{ 520{
380 struct GNUNET_TESTBED_Operation *op; 521 struct GNUNET_TESTBED_Operation *op;
522 struct OperationQueue *queue;
523 unsigned int cnt;
381 524
382 process_rq_task_id = GNUNET_SCHEDULER_NO_TASK; 525 process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
383 GNUNET_assert (NULL != rq_head); 526 GNUNET_assert (NULL != rq_head);
@@ -386,8 +529,15 @@ process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
386 if (NULL != rq_head) 529 if (NULL != rq_head)
387 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); 530 process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
388 change_state (op, OP_STATE_ACTIVE); 531 change_state (op, OP_STATE_ACTIVE);
532 for (cnt = 0; cnt < op->nqueues; cnt++)
533 {
534 queue = op->queues[cnt];
535 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
536 assign_timeslot (op, queue);
537 }
538 op->tstart = GNUNET_TIME_absolute_get ();
389 if (NULL != op->start) 539 if (NULL != op->start)
390 op->start (op->cb_cls); 540 op->start (op->cb_cls);
391} 541}
392 542
393 543
@@ -582,7 +732,7 @@ check_readiness (struct GNUNET_TESTBED_Operation *op)
582 if (NULL != evict_ops) 732 if (NULL != evict_ops)
583 { 733 {
584 for (i = 0; i < n_evict_ops; i++) 734 for (i = 0; i < n_evict_ops; i++)
585 GNUNET_TESTBED_operation_release_ (evict_ops[i]); 735 GNUNET_TESTBED_operation_release_ (evict_ops[i]);
586 GNUNET_free (evict_ops); 736 GNUNET_free (evict_ops);
587 evict_ops = NULL; 737 evict_ops = NULL;
588 /* Evicting the operations should schedule this operation */ 738 /* Evicting the operations should schedule this operation */
@@ -619,6 +769,162 @@ defer (struct GNUNET_TESTBED_Operation *op)
619 769
620 770
621/** 771/**
772 * Cleanups the array of timeslots of an operation queue. For each time slot in
773 * the array, if it is allocated to an operation, it will be deallocated from
774 * the operation
775 *
776 * @param queue the operation queue
777 */
778static void
779cleanup_tslots (struct OperationQueue *queue)
780{
781 struct FeedbackCtx *fctx = queue->fctx;
782 struct TimeSlot *tslot;
783 struct GNUNET_TESTBED_Operation *op;
784 unsigned int cnt;
785
786 GNUNET_assert (NULL != fctx);
787 for (cnt = 0; cnt < queue->max_active; cnt++)
788 {
789 tslot = &fctx->tslots_freeptr[cnt];
790 op = tslot->op;
791 if (NULL == op)
792 continue;
793 GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
794 }
795 GNUNET_free_non_null (fctx->tslots_freeptr);
796 fctx->tslots_freeptr = NULL;
797 fctx->alloc_head = NULL;
798 fctx->alloc_tail = NULL;
799 fctx->tslots_filled = 0;
800}
801
802
803/**
804 * Initializes the operation queue for parallel overlay connects
805 *
806 * @param h the host handle
807 * @param npoc the number of parallel overlay connects - the queue size
808 */
809static void
810adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
811{
812 struct FeedbackCtx *fctx = queue->fctx;
813 struct TimeSlot *tslot;
814 unsigned int cnt;
815
816 cleanup_tslots (queue);
817 n = GNUNET_MIN (n ,fctx->max_active_bound);
818 fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
819 for (cnt = 0; cnt < n; cnt++)
820 {
821 tslot = &fctx->tslots_freeptr[cnt];
822 tslot->queue = queue;
823 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot);
824 }
825 GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
826}
827
828
829/**
830 * Adapts parallelism in an adaptive queue by using the statistical data from
831 * the feedback context.
832 *
833 * @param queue the queue
834 * @param fail GNUNET_YES if the last operation failed; GNUNET_NO if not;
835 */
836static void
837adapt_parallelism (struct OperationQueue *queue, int fail)
838{
839 struct GNUNET_TIME_Relative avg;
840 struct FeedbackCtx *fctx;
841 struct TimeSlot *tslot;
842 int sd;
843 unsigned int nvals;
844 unsigned int cnt;
845
846 avg = GNUNET_TIME_UNIT_ZERO;
847 nvals = 0;
848 fctx = queue->fctx;
849 for (cnt = 0; cnt < queue->max_active; cnt++)
850 {
851 tslot = &fctx->tslots_freeptr[cnt];
852 avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
853 nvals += tslot->nvals;
854 }
855 GNUNET_assert (nvals >= queue->max_active);
856 avg = GNUNET_TIME_relative_divide (avg, nvals);
857 sd = GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, (unsigned int)
858 avg.rel_value_us);
859 if ( (sd <= 5) ||
860 (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
861 queue->max_active)) )
862 GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
863 if (GNUNET_SYSERR == sd)
864 {
865 adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
866 return;
867 }
868 GNUNET_assert (0 <= sd);
869 if ((0 == sd) && (! fail))
870 {
871 adaptive_queue_set_max_active (queue, queue->max_active * 2);
872 return;
873 }
874 if ((1 == sd) && (! fail))
875 {
876 adaptive_queue_set_max_active (queue, queue->max_active + 1);
877 return;
878 }
879 if (1 == queue->max_active)
880 {
881 adaptive_queue_set_max_active (queue, 1);
882 return;
883 }
884 if (((sd < 2) && (fail)) || (2 == sd))
885 {
886 adaptive_queue_set_max_active (queue, queue->max_active - 1);
887 return;
888 }
889 adaptive_queue_set_max_active (queue, queue->max_active / 2);
890}
891
892
893/**
894 * update tslots with the operation's completion time. Additionally, if
895 * updating a timeslot makes all timeslots filled in an adaptive operation
896 * queue, call adapt_parallelism() for that queue.
897 *
898 * @param op the operation
899 */
900static void
901update_tslots (struct GNUNET_TESTBED_Operation *op)
902{
903 struct OperationQueue *queue;
904 struct GNUNET_TIME_Relative t;
905 struct TimeSlot *tslot;
906 struct FeedbackCtx *fctx;
907
908 t = GNUNET_TIME_absolute_get_duration (op->tstart);
909 while (NULL != (tslot = op->tslots_head)) /* update time slots */
910 {
911 queue = tslot->queue;
912 fctx = queue->fctx;
913 tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
914 GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
915 tslot->op = NULL;
916 GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
917 tslot);
918 if (0 != tslot->nvals++)
919 continue;
920 fctx->tslots_filled++;
921 if (queue->max_active == fctx->tslots_filled)
922 adapt_parallelism (queue, op->failed);
923 }
924}
925
926
927/**
622 * Create an 'operation' to be performed. 928 * Create an 'operation' to be performed.
623 * 929 *
624 * @param cls closure for the callbacks 930 * @param cls closure for the callbacks
@@ -644,17 +950,32 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
644/** 950/**
645 * Create an operation queue. 951 * Create an operation queue.
646 * 952 *
953 * @param type the type of operation queue
647 * @param max_active maximum number of operations in this 954 * @param max_active maximum number of operations in this
648 * queue that can be active in parallel at the same time 955 * queue that can be active in parallel at the same time
649 * @return handle to the queue 956 * @return handle to the queue
650 */ 957 */
651struct OperationQueue * 958struct OperationQueue *
652GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active) 959GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
960 unsigned int max_active)
653{ 961{
654 struct OperationQueue *queue; 962 struct OperationQueue *queue;
963 struct FeedbackCtx *fctx;
655 964
656 queue = GNUNET_malloc (sizeof (struct OperationQueue)); 965 queue = GNUNET_malloc (sizeof (struct OperationQueue));
657 queue->max_active = max_active; 966 queue->type = type;
967 if (OPERATION_QUEUE_TYPE_FIXED == type)
968 {
969 queue->max_active = max_active;
970 }
971 else
972 {
973 fctx = GNUNET_malloc (sizeof (struct FeedbackCtx));
974 fctx->max_active_bound = max_active;
975 fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
976 queue->fctx = fctx;
977 adaptive_queue_set_max_active (queue, 1); /* start with 1 */
978 }
658 return queue; 979 return queue;
659} 980}
660 981
@@ -668,7 +989,16 @@ GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
668void 989void
669GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) 990GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
670{ 991{
992 struct FeedbackCtx *fctx;
993
671 GNUNET_break (GNUNET_YES == is_queue_empty (queue)); 994 GNUNET_break (GNUNET_YES == is_queue_empty (queue));
995 if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
996 {
997 cleanup_tslots (queue);
998 fctx = queue->fctx;
999 GNUNET_TESTBED_SD_destroy_ (fctx->sd);
1000 GNUNET_free (fctx);
1001 }
672 GNUNET_free (queue); 1002 GNUNET_free (queue);
673} 1003}
674 1004
@@ -867,8 +1197,10 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
867 rq_remove (op); 1197 rq_remove (op);
868 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ 1198 if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
869 GNUNET_TESTBED_operation_activate_ (op); 1199 GNUNET_TESTBED_operation_activate_ (op);
1200 if (OP_STATE_ACTIVE == op->state)
1201 update_tslots (op);
870 GNUNET_assert (NULL != op->queues); 1202 GNUNET_assert (NULL != op->queues);
871 GNUNET_assert (NULL != op->qentries); 1203 GNUNET_assert (NULL != op->qentries);
872 for (i = 0; i < op->nqueues; i++) 1204 for (i = 0; i < op->nqueues; i++)
873 { 1205 {
874 entry = op->qentries[i]; 1206 entry = op->qentries[i];
@@ -882,8 +1214,8 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
882 break; 1214 break;
883 case OP_STATE_WAITING: 1215 case OP_STATE_WAITING:
884 break; 1216 break;
885 case OP_STATE_READY:
886 case OP_STATE_ACTIVE: 1217 case OP_STATE_ACTIVE:
1218 case OP_STATE_READY:
887 GNUNET_assert (0 != opq->active); 1219 GNUNET_assert (0 != opq->active);
888 GNUNET_assert (opq->active >= entry->nres); 1220 GNUNET_assert (opq->active >= entry->nres);
889 opq->active -= entry->nres; 1221 opq->active -= entry->nres;
@@ -901,4 +1233,16 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
901} 1233}
902 1234
903 1235
1236/**
1237 * Marks an operation as failed
1238 *
1239 * @param op the operation to be marked as failed
1240 */
1241void
1242GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
1243{
1244 op->failed = GNUNET_YES;
1245}
1246
1247
904/* end of testbed_api_operations.c */ 1248/* end of testbed_api_operations.c */
diff --git a/src/testbed/testbed_api_operations.h b/src/testbed/testbed_api_operations.h
index b6885e69a..c44325307 100644
--- a/src/testbed/testbed_api_operations.h
+++ b/src/testbed/testbed_api_operations.h
@@ -38,14 +38,35 @@ struct OperationQueue;
38 38
39 39
40/** 40/**
41 * The type of operation queue
42 */
43enum OperationQueueType
44{
45 /**
46 * Operation queue which permits a fixed maximum number of operations to be
47 * active at any time
48 */
49 OPERATION_QUEUE_TYPE_FIXED,
50
51 /**
52 * Operation queue which adapts the number of operations to be active based on
53 * the operation completion times of previously executed operation in it
54 */
55 OPERATION_QUEUE_TYPE_ADAPTIVE
56};
57
58
59/**
41 * Create an operation queue. 60 * Create an operation queue.
42 * 61 *
43 * @param max_active maximum number of operations in this 62 * @param type the type of operation queue
44 * queue that can be active in parallel at the same time 63 * @param max_active maximum number of operations in this queue that can be
64 * active in parallel at the same time.
45 * @return handle to the queue 65 * @return handle to the queue
46 */ 66 */
47struct OperationQueue * 67struct OperationQueue *
48GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active); 68GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
69 unsigned int max_active);
49 70
50 71
51/** 72/**
@@ -199,5 +220,14 @@ void
199GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op); 220GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op);
200 221
201 222
223/**
224 * Marks an operation as failed
225 *
226 * @param op the operation to be marked as failed
227 */
228void
229GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op);
230
231
202#endif 232#endif
203/* end of testbed_api_operations.h */ 233/* end of testbed_api_operations.h */
diff --git a/src/testbed/testbed_api_peers.c b/src/testbed/testbed_api_peers.c
index 56e28a830..fd1da85ef 100644
--- a/src/testbed/testbed_api_peers.c
+++ b/src/testbed/testbed_api_peers.c
@@ -410,8 +410,6 @@ opstart_overlay_connect (void *cls)
410 opc->state = OPC_STATE_STARTED; 410 opc->state = OPC_STATE_STARTED;
411 data = opc->data; 411 data = opc->data;
412 GNUNET_assert (NULL != data); 412 GNUNET_assert (NULL != data);
413 data->tslot_index = GNUNET_TESTBED_get_tslot_ (data->p1->host, data);
414 data->tstart = GNUNET_TIME_absolute_get ();
415 msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage)); 413 msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
416 msg->header.size = 414 msg->header.size =
417 htons (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage)); 415 htons (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
@@ -434,7 +432,6 @@ static void
434oprelease_overlay_connect (void *cls) 432oprelease_overlay_connect (void *cls)
435{ 433{
436 struct OperationContext *opc = cls; 434 struct OperationContext *opc = cls;
437 struct GNUNET_TIME_Relative duration;
438 struct OverlayConnectData *data; 435 struct OverlayConnectData *data;
439 436
440 data = opc->data; 437 data = opc->data;
@@ -443,14 +440,10 @@ oprelease_overlay_connect (void *cls)
443 case OPC_STATE_INIT: 440 case OPC_STATE_INIT:
444 break; 441 break;
445 case OPC_STATE_STARTED: 442 case OPC_STATE_STARTED:
446 (void) GNUNET_TESTBED_release_time_slot_ (data->p1->host, data->tslot_index,
447 data);
448 GNUNET_TESTBED_remove_opc_ (opc->c, opc); 443 GNUNET_TESTBED_remove_opc_ (opc->c, opc);
449 break; 444 break;
450 case OPC_STATE_FINISHED: 445 case OPC_STATE_FINISHED:
451 duration = GNUNET_TIME_absolute_get_duration (data->tstart); 446 break;
452 GNUNET_TESTBED_update_time_slot_ (data->p1->host, data->tslot_index, data,
453 duration, data->failed);
454 } 447 }
455 GNUNET_free (data); 448 GNUNET_free (data);
456 GNUNET_free (opc); 449 GNUNET_free (opc);
diff --git a/src/testbed/testbed_api_peers.h b/src/testbed/testbed_api_peers.h
index 50fb29bf6..0a87bf6ee 100644
--- a/src/testbed/testbed_api_peers.h
+++ b/src/testbed/testbed_api_peers.h
@@ -250,21 +250,6 @@ struct OverlayConnectData
250 */ 250 */
251 struct OperationContext *sub_opc; 251 struct OperationContext *sub_opc;
252 252
253 /**
254 * The starting time of this operation
255 */
256 struct GNUNET_TIME_Absolute tstart;
257
258 /**
259 * Has this operation failed
260 */
261 int failed;
262
263 /**
264 * The timing slot index for this operation
265 */
266 unsigned int tslot_index;
267
268}; 253};
269 254
270 255
diff --git a/src/testbed/testbed_api_statistics.c b/src/testbed/testbed_api_statistics.c
index 1e12a7c64..99a79b866 100644
--- a/src/testbed/testbed_api_statistics.c
+++ b/src/testbed/testbed_api_statistics.c
@@ -415,8 +415,8 @@ GNUNET_TESTBED_get_statistics (unsigned int num_peers,
415 GNUNET_assert (NULL != proc); 415 GNUNET_assert (NULL != proc);
416 GNUNET_assert (NULL != cont); 416 GNUNET_assert (NULL != cont);
417 if (NULL == no_wait_queue) 417 if (NULL == no_wait_queue)
418 no_wait_queue = 418 no_wait_queue = GNUNET_TESTBED_operation_queue_create_
419 GNUNET_TESTBED_operation_queue_create_ (UINT_MAX); 419 (OPERATION_QUEUE_TYPE_FIXED, UINT_MAX);
420 sc = GNUNET_malloc (sizeof (struct GetStatsContext)); 420 sc = GNUNET_malloc (sizeof (struct GetStatsContext));
421 sc->peers = peers; 421 sc->peers = peers;
422 sc->subsystem = (NULL == subsystem) ? NULL : GNUNET_strdup (subsystem); 422 sc->subsystem = (NULL == subsystem) ? NULL : GNUNET_strdup (subsystem);