aboutsummaryrefslogtreecommitdiff
path: root/src/experimentation
diff options
context:
space:
mode:
authorMatthias Wachs <wachs@net.in.tum.de>2013-08-01 12:25:15 +0000
committerMatthias Wachs <wachs@net.in.tum.de>2013-08-01 12:25:15 +0000
commit4424bf81b5a7b4b6500a132e174794d1e5a3f6dd (patch)
treec7bd4ccd3f0f5c27475363c82700a0c8999417d8 /src/experimentation
parent9d095c5d0f48bd691b2f9086956e454427802464 (diff)
downloadgnunet-4424bf81b5a7b4b6500a132e174794d1e5a3f6dd.tar.gz
gnunet-4424bf81b5a7b4b6500a132e174794d1e5a3f6dd.zip
changes to scheduler
Diffstat (limited to 'src/experimentation')
-rw-r--r--src/experimentation/gnunet-daemon-experimentation.h8
-rw-r--r--src/experimentation/gnunet-daemon-experimentation_nodes.c5
-rw-r--r--src/experimentation/gnunet-daemon-experimentation_scheduler.c215
3 files changed, 186 insertions, 42 deletions
diff --git a/src/experimentation/gnunet-daemon-experimentation.h b/src/experimentation/gnunet-daemon-experimentation.h
index c6fc670f4..61dfd1b26 100644
--- a/src/experimentation/gnunet-daemon-experimentation.h
+++ b/src/experimentation/gnunet-daemon-experimentation.h
@@ -399,10 +399,14 @@ GED_scheduler_handle_stop (struct Node *n, struct Experiment *e);
399 399
400 400
401/** 401/**
402 * Start the scheduler component 402 * Add a new experiment for a node
403 *
404 * @param n the node
405 * @param e the experiment
406 * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
403 */ 407 */
404void 408void
405GED_scheduler_add (struct Node *n, struct Experiment *e); 409GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound);
406 410
407/** 411/**
408 * Start the scheduler component 412 * Start the scheduler component
diff --git a/src/experimentation/gnunet-daemon-experimentation_nodes.c b/src/experimentation/gnunet-daemon-experimentation_nodes.c
index 58cc2e84f..fca7e001c 100644
--- a/src/experimentation/gnunet-daemon-experimentation_nodes.c
+++ b/src/experimentation/gnunet-daemon-experimentation_nodes.c
@@ -317,7 +317,7 @@ get_experiments_cb (struct Node *n, struct Experiment *e)
317 GNUNET_i2s (&n->id)); 317 GNUNET_i2s (&n->id));
318 318
319 /* Tell the scheduler to add a node with an experiment */ 319 /* Tell the scheduler to add a node with an experiment */
320 GED_scheduler_add (n, e); 320 GED_scheduler_add (n, e, GNUNET_YES);
321 counter ++; 321 counter ++;
322} 322}
323 323
@@ -786,9 +786,6 @@ static void handle_stop (const struct GNUNET_PeerIdentity *peer,
786 GNUNET_break (0); 786 GNUNET_break (0);
787 return; 787 return;
788 } 788 }
789
790 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
791 "STOP", GNUNET_i2s (peer), name);
792 GED_scheduler_handle_stop (n, e); 789 GED_scheduler_handle_stop (n, e);
793} 790}
794 791
diff --git a/src/experimentation/gnunet-daemon-experimentation_scheduler.c b/src/experimentation/gnunet-daemon-experimentation_scheduler.c
index aaf47174e..de5c80614 100644
--- a/src/experimentation/gnunet-daemon-experimentation_scheduler.c
+++ b/src/experimentation/gnunet-daemon-experimentation_scheduler.c
@@ -64,19 +64,41 @@ struct ScheduledExperiment {
64 struct Experiment *e; 64 struct Experiment *e;
65 struct Node *n; 65 struct Node *n;
66 int state; 66 int state;
67 int outbound;
67 GNUNET_SCHEDULER_TaskIdentifier task; 68 GNUNET_SCHEDULER_TaskIdentifier task;
68}; 69};
69 70
70struct ScheduledExperiment *waiting_head; 71struct ScheduledExperiment *waiting_in_head;
71struct ScheduledExperiment *waiting_tail; 72struct ScheduledExperiment *waiting_in_tail;
73
74struct ScheduledExperiment *running_in_head;
75struct ScheduledExperiment *running_in_tail;
76
77struct ScheduledExperiment *waiting_out_head;
78struct ScheduledExperiment *waiting_out_tail;
79
80struct ScheduledExperiment *running_out_head;
81struct ScheduledExperiment *running_out_tail;
72 82
73struct ScheduledExperiment *running_head;
74struct ScheduledExperiment *running_tail;
75 83
76static unsigned int experiments_scheduled; 84static unsigned int experiments_scheduled;
77static unsigned int experiments_running; 85static unsigned int experiments_running;
78static unsigned int experiments_requested; 86static unsigned int experiments_requested;
79 87
88
89static struct ScheduledExperiment *
90find_experiment (struct ScheduledExperiment *head, struct ScheduledExperiment *tail,
91 struct Node *n, struct Experiment *e, int outbound)
92{
93 struct ScheduledExperiment *cur;
94 for (cur = head; NULL != cur; cur = cur->next)
95 {
96 if ((cur->n == n) && (cur->e == e) && (cur->outbound == outbound)) /* Node and experiment are equal */
97 break;
98 }
99 return cur;
100}
101
80static void 102static void
81request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc) 103request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
82{ 104{
@@ -86,19 +108,19 @@ request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
86 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to request for experiment `%s'\n", 108 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to request for experiment `%s'\n",
87 GNUNET_i2s (&se->n->id), se->e->name); 109 GNUNET_i2s (&se->n->id), se->e->name);
88 110
89 GNUNET_CONTAINER_DLL_remove (waiting_head, waiting_tail, se); 111 GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
90 GNUNET_free (se); 112 GNUNET_free (se);
91 113
92 /* Remove experiment */ 114 /* Remove experiment */
93
94 GNUNET_assert (experiments_requested > 0); 115 GNUNET_assert (experiments_requested > 0);
95 experiments_requested --; 116 experiments_requested --;
96 GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO); 117 GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
97} 118}
98 119
99static void start_experiment (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc) 120static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
100{ 121{
101 struct ScheduledExperiment *se = cls; 122 struct ScheduledExperiment *se = cls;
123 struct GNUNET_TIME_Relative start;
102 struct GNUNET_TIME_Relative end; 124 struct GNUNET_TIME_Relative end;
103 struct GNUNET_TIME_Relative backoff; 125 struct GNUNET_TIME_Relative backoff;
104 126
@@ -111,15 +133,78 @@ static void start_experiment (void *cls,const struct GNUNET_SCHEDULER_TaskContex
111 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000); 133 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
112 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n", 134 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
113 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value); 135 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
114 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &start_experiment, se); 136 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_inbound, se);
115 return; 137 return;
116 } 138 }
117 else if (BUSY == se->state) 139 else if (BUSY == se->state)
118 se->state = NOT_RUNNING; 140 se->state = NOT_RUNNING;
119 141
120 if (NOT_RUNNING == se->state) 142 switch (se->state) {
143 case NOT_RUNNING:
144 /* Send START_ACK message */
145 //GED_nodes_request_start (se->n, se->e);
146 se->state = REQUESTED;
147 /* Schedule to run */
148 start = GNUNET_TIME_absolute_get_remaining(se->e->start);
149 if (0 == start.rel_value)
150 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
151 else
152 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
153 break;
154 case REQUESTED:
155 /* Already requested */
156 se->state = STARTED;
157 case STARTED:
158 /* Experiment is running */
159 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
160 GNUNET_i2s (&se->n->id), se->e->name);
161
162 /* do work here */
163
164 /* Reschedule */
165 end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
166 if (0 == end.rel_value)
167 {
168 se->state = STOPPED;
169 return; /* End of experiment is reached */
170 }
171 /* Reschedule */
172 se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_inbound, se);
173 break;
174 case STOPPED:
175 /* Experiment expired */
176 break;
177 default:
178 break;
179 }
180
181}
182
183static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
184{
185 struct ScheduledExperiment *se = cls;
186 struct GNUNET_TIME_Relative end;
187 struct GNUNET_TIME_Relative backoff;
188
189 se->task = GNUNET_SCHEDULER_NO_TASK;
190
191 if (GNUNET_NO == GED_nodes_rts (se->n))
121 { 192 {
122 /* Send start message */ 193 /* Cannot send to peer, core is busy */
194 se->state = BUSY;
195 backoff = GNUNET_TIME_UNIT_SECONDS;
196 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
197 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
198 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
199 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_outbound, se);
200 return;
201 }
202 else if (BUSY == se->state)
203 se->state = NOT_RUNNING; /* Not busy anymore, can send */
204
205 switch (se->state) {
206 case NOT_RUNNING:
207 /* Send START message */
123 GED_nodes_request_start (se->n, se->e); 208 GED_nodes_request_start (se->n, se->e);
124 se->state = REQUESTED; 209 se->state = REQUESTED;
125 se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se); 210 se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se);
@@ -128,15 +213,12 @@ static void start_experiment (void *cls,const struct GNUNET_SCHEDULER_TaskContex
128 GNUNET_i2s (&se->n->id), se->e->name); 213 GNUNET_i2s (&se->n->id), se->e->name);
129 experiments_requested ++; 214 experiments_requested ++;
130 GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO); 215 GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
131 return; 216 break;
132 } 217 case REQUESTED:
133 else if (REQUESTED == se->state) 218 /* Expecting START_ACK */
134 { 219 GNUNET_break (0);
135 /* Already requested */ 220 break;
136 return; 221 case STARTED:
137 }
138 else if (STARTED == se->state)
139 {
140 /* Experiment is running */ 222 /* Experiment is running */
141 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n", 223 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
142 GNUNET_i2s (&se->n->id), se->e->name); 224 GNUNET_i2s (&se->n->id), se->e->name);
@@ -151,15 +233,17 @@ static void start_experiment (void *cls,const struct GNUNET_SCHEDULER_TaskContex
151 return; /* End of experiment is reached */ 233 return; /* End of experiment is reached */
152 } 234 }
153 /* Reschedule */ 235 /* Reschedule */
154 se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &start_experiment, se); 236 se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_outbound, se);
155 } 237 break;
156 238 case STOPPED:
157 else if (STOPPED == se->state)
158 {
159 /* Experiment expired */ 239 /* Experiment expired */
240 break;
241 default:
242 break;
160 } 243 }
161} 244}
162 245
246
163/** 247/**
164 * Handle a START message from a remote node 248 * Handle a START message from a remote node
165 * 249 *
@@ -169,7 +253,21 @@ static void start_experiment (void *cls,const struct GNUNET_SCHEDULER_TaskContex
169void 253void
170GED_scheduler_handle_start (struct Node *n, struct Experiment *e) 254GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
171{ 255{
256 struct ScheduledExperiment *se;
172 257
258 if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) ||
259 (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO))))
260 {
261 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received duplicate %s message from peer %s for experiment `%s'\n"),
262 "START", GNUNET_i2s (&n->id), e->name);
263 GNUNET_break_op (0);
264 return;
265 }
266
267 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
268 "START", GNUNET_i2s (&n->id), e->name);
269
270 GED_scheduler_add (n, e, GNUNET_NO);
173} 271}
174 272
175/** 273/**
@@ -181,7 +279,20 @@ GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
181void 279void
182GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e) 280GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
183{ 281{
282 struct ScheduledExperiment *se;
184 283
284 if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
285 {
286 GNUNET_break (0);
287 return;
288 }
289
290 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for requested experiment `%s'\n"),
291 "START_ACK", GNUNET_i2s (&n->id), e->name);
292
293 if (GNUNET_SCHEDULER_NO_TASK != se->task)
294 GNUNET_SCHEDULER_cancel (se->task);
295 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
185} 296}
186 297
187 298
@@ -194,6 +305,22 @@ GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
194void 305void
195GED_scheduler_handle_stop (struct Node *n, struct Experiment *e) 306GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
196{ 307{
308 struct ScheduledExperiment *se;
309
310 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
311 "STOP", GNUNET_i2s (&n->id), e->name);
312
313 if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
314 {
315 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for waiting experiment `%s'\n"),
316 "STOP", GNUNET_i2s (&n->id), e->name);
317 }
318
319 if (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
320 {
321 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for running experiment `%s'\n"),
322 "STOP", GNUNET_i2s (&n->id), e->name);
323 }
197 324
198} 325}
199 326
@@ -202,35 +329,51 @@ GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
202 * 329 *
203 * @param n the node 330 * @param n the node
204 * @param e the experiment 331 * @param e the experiment
332 * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
205 */ 333 */
206void 334void
207GED_scheduler_add (struct Node *n, struct Experiment *e) 335GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
208{ 336{
209 struct ScheduledExperiment *se; 337 struct ScheduledExperiment *se;
210 struct GNUNET_TIME_Relative start; 338 struct GNUNET_TIME_Relative start;
211 struct GNUNET_TIME_Relative end; 339 struct GNUNET_TIME_Relative end;
212 340
341 GNUNET_assert ((GNUNET_YES == outbound) || (GNUNET_NO == outbound));
342
213 start = GNUNET_TIME_absolute_get_remaining(e->start); 343 start = GNUNET_TIME_absolute_get_remaining(e->start);
214 end = GNUNET_TIME_absolute_get_remaining(e->stop); 344 end = GNUNET_TIME_absolute_get_remaining(e->stop);
215 if (0 == end.rel_value) 345 if (0 == end.rel_value)
216 return; /* End of experiment is reached */ 346 return; /* End of experiment is reached */
217 347
218 /* Add additional checks here if required */ 348 /* Add additional checks here if required */
219
220 se = GNUNET_malloc (sizeof (struct ScheduledExperiment)); 349 se = GNUNET_malloc (sizeof (struct ScheduledExperiment));
221 se->state = NOT_RUNNING; 350 se->state = NOT_RUNNING;
351 se->outbound = outbound;
222 se->e = e; 352 se->e = e;
223 se->n = n; 353 se->n = n;
224 if (0 == start.rel_value) 354
225 se->task = GNUNET_SCHEDULER_add_now (&start_experiment, se); 355 if (GNUNET_YES == outbound)
356 {
357 if (0 == start.rel_value)
358 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
359 else
360 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_outbound, se);
361 GNUNET_CONTAINER_DLL_insert (waiting_out_head, waiting_out_tail, se);
362 }
226 else 363 else
227 se->task = GNUNET_SCHEDULER_add_delayed (start, &start_experiment, se); 364 {
365 if (0 == start.rel_value)
366 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
367 else
368 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
369 GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail, se);
370 }
228 371
229 GNUNET_CONTAINER_DLL_insert (waiting_head, waiting_tail, se); 372 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Added %s experiment `%s' for node to be scheduled\n",
230 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Added experiment `%s' for node to be scheduled\n", 373 (GNUNET_YES == outbound) ? "outbound" : "inbound", e->name, GNUNET_i2s(&se->n->id));
231 e->name, GNUNET_i2s(&se->n->id));
232 experiments_scheduled ++; 374 experiments_scheduled ++;
233 GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO); 375 GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
376
234} 377}
235 378
236/** 379/**
@@ -253,11 +396,11 @@ GED_scheduler_stop ()
253 struct ScheduledExperiment *cur; 396 struct ScheduledExperiment *cur;
254 struct ScheduledExperiment *next; 397 struct ScheduledExperiment *next;
255 398
256 next = waiting_head; 399 next = waiting_in_head;
257 while (NULL != (cur = next)) 400 while (NULL != (cur = next))
258 { 401 {
259 next = cur->next; 402 next = cur->next;
260 GNUNET_CONTAINER_DLL_remove (waiting_head, waiting_tail, cur); 403 GNUNET_CONTAINER_DLL_remove (waiting_in_head, waiting_in_tail, cur);
261 if (GNUNET_SCHEDULER_NO_TASK != cur->task) 404 if (GNUNET_SCHEDULER_NO_TASK != cur->task)
262 { 405 {
263 GNUNET_SCHEDULER_cancel (cur->task); 406 GNUNET_SCHEDULER_cancel (cur->task);
@@ -269,11 +412,11 @@ GED_scheduler_stop ()
269 GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO); 412 GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
270 } 413 }
271 414
272 next = running_head; 415 next = running_in_head;
273 while (NULL != (cur = next)) 416 while (NULL != (cur = next))
274 { 417 {
275 next = cur->next; 418 next = cur->next;
276 GNUNET_CONTAINER_DLL_remove (running_head, running_tail, cur); 419 GNUNET_CONTAINER_DLL_remove (running_in_head, running_in_tail, cur);
277 if (GNUNET_SCHEDULER_NO_TASK != cur->task) 420 if (GNUNET_SCHEDULER_NO_TASK != cur->task)
278 { 421 {
279 GNUNET_SCHEDULER_cancel (cur->task); 422 GNUNET_SCHEDULER_cancel (cur->task);