aboutsummaryrefslogtreecommitdiff
path: root/src/testbed/gnunet-service-testbed_barriers.c
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2013-08-30 14:55:13 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2013-08-30 14:55:13 +0000
commit5e1efe185cf484018f53dd33d64e546ac042fdee (patch)
treee1369e8e1a239426d4ba2d81bb31571bbd4f82eb /src/testbed/gnunet-service-testbed_barriers.c
parent7d8d536bbaa0f72587552af517f643e3c05e7a6f (diff)
downloadgnunet-5e1efe185cf484018f53dd33d64e546ac042fdee.tar.gz
gnunet-5e1efe185cf484018f53dd33d64e546ac042fdee.zip
- towards testbed barriers
Diffstat (limited to 'src/testbed/gnunet-service-testbed_barriers.c')
-rw-r--r--src/testbed/gnunet-service-testbed_barriers.c512
1 files changed, 512 insertions, 0 deletions
diff --git a/src/testbed/gnunet-service-testbed_barriers.c b/src/testbed/gnunet-service-testbed_barriers.c
new file mode 100644
index 000000000..079096d86
--- /dev/null
+++ b/src/testbed/gnunet-service-testbed_barriers.c
@@ -0,0 +1,512 @@
1/*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file testbed/gnunet-service-testbed_barriers.c
23 * @brief barrier handling at the testbed controller
24 * @author Sree Harsha Totakura <sreeharsha@totakura.in>
25 */
26
27#include "gnunet-service-testbed.h"
28
29/**
30 * timeout for outgoing message transmissions in seconds
31 */
32#define MESSAGE_SEND_TIMEOUT(s) \
33 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
34
35
36/**
37 * Barrier
38 */
39struct Barrier;
40
41
42/**
43 * Message queue for transmitting messages
44 */
45struct MessageQueue
46{
47 /**
48 * next pointer for DLL
49 */
50 struct MessageQueue *next;
51
52 /**
53 * prev pointer for DLL
54 */
55 struct MessageQueue *prev;
56
57 /**
58 * The message to be sent
59 */
60 struct GNUNET_MessageHeader *msg;
61};
62
63/**
64 * Context to be associated with each client
65 */
66struct ClientCtx
67{
68 /**
69 * The barrier this client is waiting for
70 */
71 struct Barrier *barrier;
72
73 /**
74 * DLL next ptr
75 */
76 struct ClientCtx *next;
77
78 /**
79 * DLL prev ptr
80 */
81 struct ClientCtx *prev;
82
83 /**
84 * The client handle
85 */
86 struct GNUNET_SERVER_Client *client;
87
88 /**
89 * the transmission handle
90 */
91 struct GNUNET_SERVER_TransmitHandle *tx;
92
93 /**
94 * message queue head
95 */
96 struct MessageQueue *mq_head;
97
98 /**
99 * message queue tail
100 */
101 struct MessageQueue *mq_tail;
102};
103
104
105/**
106 * Barrier
107 */
108struct Barrier
109{
110 /**
111 * The hashcode of the barrier name
112 */
113 struct GNUNET_HashCode hash;
114
115 /**
116 * The name of the barrier
117 */
118 char *name;
119
120 /**
121 * DLL head for the list of clients waiting for this barrier
122 */
123 struct ClientCtx *head;
124
125 /**
126 * DLL tail for the list of clients waiting for this barrier
127 */
128 struct ClientCtx *tail;
129
130 /**
131 * Number of peers which have reached this barrier
132 */
133 unsigned int nreached;
134
135 /**
136 * Number of slaves we have initialised this barrier
137 */
138 unsigned int nslaves;
139
140 /**
141 * Quorum percentage to be reached
142 */
143 uint8_t quorum;
144
145 /**
146 * Was there a timeout while propagating initialisation
147 */
148 uint8_t timedout;
149};
150
151
152/**
153 * Hashtable handle for storing initialised barriers
154 */
155static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
156
157/**
158 * Service context
159 */
160static struct GNUNET_SERVICE_Context *ctx;
161
162
163/**
164 * Function called to notify a client about the connection
165 * begin ready to queue more data. "buf" will be
166 * NULL and "size" zero if the connection was closed for
167 * writing in the meantime.
168 *
169 * @param cls client context
170 * @param size number of bytes available in buf
171 * @param buf where the callee should write the message
172 * @return number of bytes written to buf
173 */
174static size_t
175transmit_ready_cb (void *cls, size_t size, void *buf)
176{
177 struct ClientCtx *ctx = cls;
178 struct GNUNET_SERVER_Client *client = ctx->client;
179 struct MessageQueue *mq;
180 struct GNUNET_MessageHeader *msg;
181 size_t wrote;
182
183 ctx->tx = NULL;
184 wrote = 0;
185 if ((0 == size) || (NULL == buf))
186 {
187 GNUNET_assert (NULL != ctx->client);
188 GNUNET_SERVER_client_drop (ctx->client);
189 ctx->client = NULL;
190 return 0;
191 }
192 mq = ctx->mq_head;
193 msg = mq->msg;
194 wrote = ntohs (msg->size);
195 GNUNET_assert (size >= wrote);
196 (void) memcpy (buf, msg, wrote);
197 GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
198 GNUNET_free (mq->msg);
199 GNUNET_free (mq);
200 if (NULL != (mq = ctx->mq_head))
201 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
202 MESSAGE_SEND_TIMEOUT (30),
203 &transmit_ready_cb, ctx);
204 return wrote;
205}
206
207
208/**
209 * Queue a message into a clients message queue
210 *
211 * @param ctx the context associated with the client
212 * @param msg the message to queue. Will be consumed
213 * @param suspended is the client suspended at the time of calling queue_message
214 */
215static void
216queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
217{
218 struct MessageQueue *mq;
219 struct GNUNET_SERVER_Client *client = ctx->client;
220
221 mq = GNUNET_malloc (sizeof (struct MessageQueue));
222 mq->msg = msg;
223 GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
224 if (NULL == ctx->tx)
225 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
226 MESSAGE_SEND_TIMEOUT (30),
227 &transmit_ready_cb, ctx);
228}
229
230
231#if 0
232/**
233 * Function to remove a barrier from the barrier map and cleanup resources
234 * occupied by a barrier
235 *
236 * @param barrier the barrier handle
237 */
238static void
239remove_barrier (struct Barrier *barrier)
240{
241 GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
242 &barrier->hash,
243 barrier));
244 GNUNET_free (barrier->name);
245 GNUNET_free (barrier);
246}
247
248
249/**
250 * Function called upon timeout while waiting for a response from the
251 * subcontrollers to barrier init message
252 *
253 * @param
254 * @return
255 */
256static void
257fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
258{
259 struct ForwardedOperationContext *foctx = cls;
260 struct Barrier *barrier = foctx->cls;
261
262 barrier->nslaves--;
263 barrier->timedout = GNUNET_YES;
264 if (0 == barrier->nslaves)
265 {
266 GST_send_operation_fail_msg (foctx->client, foctx->operation_id,
267 "Timeout while contacting a slave controller");
268 remove_barrier (barrier);
269 }
270}
271#endif
272
273/**
274 * Task for sending barrier crossed notifications to waiting client
275 *
276 * @param cls the barrier which is crossed
277 * @param tc scheduler task context
278 */
279static void
280notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
281{
282 struct Barrier *barrier = cls;
283 struct ClientCtx *client_ctx;
284 struct GNUNET_TESTBED_BarrierStatus *msg;
285 struct GNUNET_MessageHeader *dup_msg;
286 uint16_t name_len;
287 uint16_t msize;
288
289 name_len = strlen (barrier->name) + 1;
290 msize = sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len;
291 msg = GNUNET_malloc (msize);
292 msg->header.size = htons (msize);
293 msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
294 msg->status = 0;
295 msg->name_len = htons (name_len);
296 (void) memcpy (msg->data, barrier->name, name_len);
297 msg->data[name_len] = '\0';
298 while (NULL != (client_ctx = barrier->head))
299 {
300 dup_msg = GNUNET_copy_message (&msg->header);
301 queue_message (client_ctx, dup_msg);
302 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
303 GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
304 GNUNET_free (client_ctx);
305 }
306}
307
308
309/**
310 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
311 * message should come from peers or a shared helper service using the
312 * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
313 *
314 * This handler is queued in the main service and will handle the messages sent
315 * either from the testbed driver or from a high level controller
316 *
317 * @param cls NULL
318 * @param client identification of the client
319 * @param message the actual message
320 */
321static void
322handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
323 const struct GNUNET_MessageHeader *message)
324{
325 const struct GNUNET_TESTBED_BarrierWait *msg;
326 struct Barrier *barrier;
327 char *name;
328 struct ClientCtx *client_ctx;
329 struct GNUNET_HashCode key;
330 size_t name_len;
331 uint16_t msize;
332
333 msize = ntohs (message->size);
334 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
335 {
336 GNUNET_break_op (0);
337 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
338 return;
339 }
340 if (NULL == barrier_map)
341 {
342 GNUNET_break (0);
343 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
344 return;
345 }
346 msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
347 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
348 name = GNUNET_malloc (name_len + 1);
349 name[name_len] = '\0';
350 (void) memcpy (name, msg->name, name_len);
351 GNUNET_CRYPTO_hash (name, name_len - 1, &key);
352 if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
353 {
354 GNUNET_break (0);
355 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
356 GNUNET_free (name);
357 return;
358 }
359 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
360 if (NULL == client_ctx)
361 {
362 client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
363 client_ctx->client = client;
364 GNUNET_SERVER_client_keep (client);
365 client_ctx->barrier = barrier;
366 GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
367 barrier->nreached++;
368 if ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
369 notify_task_cb (barrier, NULL);
370 }
371 GNUNET_SERVER_receive_done (client, GNUNET_OK);
372}
373
374
375/**
376 * Functions with this signature are called whenever a client
377 * is disconnected on the network level.
378 *
379 * @param cls closure
380 * @param client identification of the client; NULL
381 * for the last call when the server is destroyed
382 */
383static void
384disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
385{
386 struct ClientCtx *client_ctx;
387 struct Barrier *barrier;
388
389 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
390 if (NULL == client_ctx)
391 return;
392 barrier = client_ctx->barrier;
393 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
394 if (NULL != client_ctx->tx)
395 GNUNET_SERVER_notify_transmit_ready_cancel (client_ctx->tx);
396
397}
398
399
400/**
401 * Function to initialise barrriers component
402 */
403void
404GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
405{
406 static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
407 {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
408 {NULL, NULL, 0, 0}
409 };
410 struct GNUNET_SERVER_Handle *srv;
411
412 barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
413 ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
414 GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
415 srv = GNUNET_SERVICE_get_server (ctx);
416 GNUNET_SERVER_add_handlers (srv, message_handlers);
417 GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
418}
419
420
421/**
422 * Function to stop the barrier service
423 */
424void
425GST_barriers_stop ()
426{
427 GNUNET_assert (NULL != ctx);
428 GNUNET_SERVICE_stop (ctx);
429}
430
431
432/**
433 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
434 * message should always come from a parent controller or the testbed API if we
435 * are the root controller.
436 *
437 * This handler is queued in the main service and will handle the messages sent
438 * either from the testbed driver or from a high level controller
439 *
440 * @param cls NULL
441 * @param client identification of the client
442 * @param message the actual message
443 */
444void
445GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
446 const struct GNUNET_MessageHeader *message)
447{
448 const struct GNUNET_TESTBED_BarrierInit *msg;
449 const char *name;
450 struct Barrier *barrier;
451 struct Slave *slave;
452 struct GNUNET_HashCode hash;
453 size_t name_len;
454 uint64_t op_id;
455 unsigned int cnt;
456 uint16_t msize;
457
458 if (NULL == GST_context)
459 {
460 GNUNET_break_op (0);
461 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
462 return;
463 }
464 if (client != GST_context->client)
465 {
466 GNUNET_break_op (0);
467 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
468 return;
469 }
470 msize = ntohs (message->size);
471 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
472 {
473 GNUNET_break_op (0);
474 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
475 return;
476 }
477 msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
478 op_id = GNUNET_ntohll (msg->op_id);
479 name = msg->name;
480 name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
481 GNUNET_CRYPTO_hash (name, name_len, &hash);
482 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
483 {
484 GST_send_operation_fail_msg (client, op_id, "Barrier already initialised");
485 GNUNET_SERVER_receive_done (client, GNUNET_OK);
486 return;
487 }
488 barrier = GNUNET_malloc (sizeof (struct Barrier));
489 (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
490 barrier->quorum = msg->quorum;
491 barrier->name = GNUNET_malloc (name_len + 1);
492 barrier->name[name_len] = '\0';
493 (void) memcpy (barrier->name, name, name_len);
494 GNUNET_assert (GNUNET_OK ==
495 GNUNET_CONTAINER_multihashmap_put (barrier_map,
496 &barrier->hash,
497 barrier,
498 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
499 GNUNET_SERVER_receive_done (client, GNUNET_OK);
500 /* Propagate barrier init to subcontrollers */
501 for (cnt = 0; cnt < GST_slave_list_size; cnt++)
502 {
503 if (NULL == (slave = GST_slave_list[cnt]))
504 continue;
505 if (NULL == slave->controller)
506 {
507 GNUNET_break (0);/* May happen when we are connecting to the controller */
508 continue;
509 }
510 GNUNET_break (0); /* FIXME */
511 }
512}