aboutsummaryrefslogtreecommitdiff
path: root/src/testbed/gnunet-service-testbed_barriers.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-10-10 15:47:00 +0000
committerChristian Grothoff <christian@grothoff.org>2016-10-10 15:47:00 +0000
commit93085e8a2991fde229400b588a5930e9fcca0d75 (patch)
tree0384246adbd96fa0138a46ad5fecb777aa40e789 /src/testbed/gnunet-service-testbed_barriers.c
parent2bf962c76bb82c1f38acea42c7bdfb241e2582e7 (diff)
downloadgnunet-93085e8a2991fde229400b588a5930e9fcca0d75.tar.gz
gnunet-93085e8a2991fde229400b588a5930e9fcca0d75.zip
migrating testbed to new service API
Diffstat (limited to 'src/testbed/gnunet-service-testbed_barriers.c')
-rw-r--r--src/testbed/gnunet-service-testbed_barriers.c603
1 files changed, 290 insertions, 313 deletions
diff --git a/src/testbed/gnunet-service-testbed_barriers.c b/src/testbed/gnunet-service-testbed_barriers.c
index c3ae82ed8..831bc3c6d 100644
--- a/src/testbed/gnunet-service-testbed_barriers.c
+++ b/src/testbed/gnunet-service-testbed_barriers.c
@@ -61,28 +61,6 @@ struct Barrier;
61 61
62 62
63/** 63/**
64 * Message queue for transmitting messages
65 */
66struct MessageQueue
67{
68 /**
69 * next pointer for DLL
70 */
71 struct MessageQueue *next;
72
73 /**
74 * prev pointer for DLL
75 */
76 struct MessageQueue *prev;
77
78 /**
79 * The message to be sent
80 */
81 struct GNUNET_MessageHeader *msg;
82};
83
84
85/**
86 * Context to be associated with each client 64 * Context to be associated with each client
87 */ 65 */
88struct ClientCtx 66struct ClientCtx
@@ -105,22 +83,8 @@ struct ClientCtx
105 /** 83 /**
106 * The client handle 84 * The client handle
107 */ 85 */
108 struct GNUNET_SERVER_Client *client; 86 struct GNUNET_SERVICE_Client *client;
109
110 /**
111 * the transmission handle
112 */
113 struct GNUNET_SERVER_TransmitHandle *tx;
114 87
115 /**
116 * message queue head
117 */
118 struct MessageQueue *mq_head;
119
120 /**
121 * message queue tail
122 */
123 struct MessageQueue *mq_tail;
124}; 88};
125 89
126 90
@@ -169,7 +133,7 @@ struct Barrier
169 /** 133 /**
170 * The client handle to the master controller 134 * The client handle to the master controller
171 */ 135 */
172 struct GNUNET_SERVER_Client *mc; 136 struct GNUNET_SERVICE_Client *mc;
173 137
174 /** 138 /**
175 * The name of the barrier 139 * The name of the barrier
@@ -199,7 +163,7 @@ struct Barrier
199 /** 163 /**
200 * Identifier for the timeout task 164 * Identifier for the timeout task
201 */ 165 */
202 struct GNUNET_SCHEDULER_Task * tout_task; 166 struct GNUNET_SCHEDULER_Task *tout_task;
203 167
204 /** 168 /**
205 * The status of this barrier 169 * The status of this barrier
@@ -247,102 +211,7 @@ static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
247/** 211/**
248 * Service context 212 * Service context
249 */ 213 */
250static struct GNUNET_SERVICE_Context *ctx; 214static struct GNUNET_SERVICE_Handle *ctx;
251
252
253/**
254 * Function called to notify a client about the connection
255 * begin ready to queue more data. "buf" will be
256 * NULL and "size" zero if the connection was closed for
257 * writing in the meantime.
258 *
259 * @param cls client context
260 * @param size number of bytes available in buf
261 * @param buf where the callee should write the message
262 * @return number of bytes written to buf
263 */
264static size_t
265transmit_ready_cb (void *cls, size_t size, void *buf)
266{
267 struct ClientCtx *ctx = cls;
268 struct GNUNET_SERVER_Client *client = ctx->client;
269 struct MessageQueue *mq;
270 struct GNUNET_MessageHeader *msg;
271 size_t wrote;
272
273 ctx->tx = NULL;
274 if ((0 == size) || (NULL == buf))
275 {
276 GNUNET_assert (NULL != ctx->client);
277 GNUNET_SERVER_client_drop (ctx->client);
278 ctx->client = NULL;
279 return 0;
280 }
281 mq = ctx->mq_head;
282 msg = mq->msg;
283 wrote = ntohs (msg->size);
284 GNUNET_assert (size >= wrote);
285 GNUNET_memcpy (buf, msg, wrote);
286 GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
287 GNUNET_free (mq->msg);
288 GNUNET_free (mq);
289 if (NULL != (mq = ctx->mq_head))
290 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
291 MESSAGE_SEND_TIMEOUT (30),
292 &transmit_ready_cb, ctx);
293 return wrote;
294}
295
296
297/**
298 * Queue a message into a clients message queue
299 *
300 * @param ctx the context associated with the client
301 * @param msg the message to queue. Will be consumed
302 */
303static void
304queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
305{
306 struct MessageQueue *mq;
307 struct GNUNET_SERVER_Client *client = ctx->client;
308
309 mq = GNUNET_new (struct MessageQueue);
310 mq->msg = msg;
311 LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
312 ntohs (msg->type), ntohs (msg->size));
313 GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
314 if (NULL == ctx->tx)
315 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
316 MESSAGE_SEND_TIMEOUT (30),
317 &transmit_ready_cb, ctx);
318}
319
320
321/**
322 * Function to cleanup client context data structure
323 *
324 * @param ctx the client context data structure
325 */
326static void
327cleanup_clientctx (struct ClientCtx *ctx)
328{
329 struct MessageQueue *mq;
330
331 if (NULL != ctx->client)
332 {
333 GNUNET_SERVER_client_set_user_context_ (ctx->client, NULL, 0);
334 GNUNET_SERVER_client_drop (ctx->client);
335 }
336 if (NULL != ctx->tx)
337 GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
338 if (NULL != (mq = ctx->mq_head))
339 {
340 GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
341 GNUNET_free (mq->msg);
342 GNUNET_free (mq);
343 }
344 GNUNET_free (ctx);
345}
346 215
347 216
348/** 217/**
@@ -356,16 +225,18 @@ remove_barrier (struct Barrier *barrier)
356{ 225{
357 struct ClientCtx *ctx; 226 struct ClientCtx *ctx;
358 227
359 GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map, 228 GNUNET_assert (GNUNET_YES ==
360 &barrier->hash, 229 GNUNET_CONTAINER_multihashmap_remove (barrier_map,
361 barrier)); 230 &barrier->hash,
231 barrier));
362 while (NULL != (ctx = barrier->head)) 232 while (NULL != (ctx = barrier->head))
363 { 233 {
364 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx); 234 GNUNET_CONTAINER_DLL_remove (barrier->head,
365 cleanup_clientctx (ctx); 235 barrier->tail,
236 ctx);
237 GNUNET_free (ctx);
366 } 238 }
367 GNUNET_free (barrier->name); 239 GNUNET_free (barrier->name);
368 GNUNET_SERVER_client_drop (barrier->mc);
369 GNUNET_free (barrier); 240 GNUNET_free (barrier);
370} 241}
371 242
@@ -383,7 +254,9 @@ cancel_wrappers (struct Barrier *barrier)
383 while (NULL != (wrapper = barrier->whead)) 254 while (NULL != (wrapper = barrier->whead))
384 { 255 {
385 GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier); 256 GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
386 GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper); 257 GNUNET_CONTAINER_DLL_remove (barrier->whead,
258 barrier->wtail,
259 wrapper);
387 GNUNET_free (wrapper); 260 GNUNET_free (wrapper);
388 } 261 }
389} 262}
@@ -399,29 +272,33 @@ cancel_wrappers (struct Barrier *barrier)
399 * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR 272 * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
400 */ 273 */
401static void 274static void
402send_client_status_msg (struct GNUNET_SERVER_Client *client, 275send_client_status_msg (struct GNUNET_SERVICE_Client *client,
403 const char *name, 276 const char *name,
404 enum GNUNET_TESTBED_BarrierStatus status, 277 enum GNUNET_TESTBED_BarrierStatus status,
405 const char *emsg) 278 const char *emsg)
406{ 279{
280 struct GNUNET_MQ_Envelope *env;
407 struct GNUNET_TESTBED_BarrierStatusMsg *msg; 281 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
408 size_t name_len; 282 size_t name_len;
409 uint16_t msize; 283 size_t err_len;
410 284
411 GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status)); 285 GNUNET_assert ( (NULL == emsg) ||
412 name_len = strlen (name); 286 (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) );
413 msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) 287 name_len = strlen (name) + 1;
414 + (name_len + 1) 288 err_len = ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
415 + ((NULL == emsg) ? 0 : (strlen (emsg) + 1)); 289 env = GNUNET_MQ_msg_extra (msg,
416 msg = GNUNET_malloc (msize); 290 name_len + err_len,
417 msg->header.size = htons (msize); 291 GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
418 msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
419 msg->status = htons (status); 292 msg->status = htons (status);
420 msg->name_len = htons ((uint16_t) name_len); 293 msg->name_len = htons ((uint16_t) name_len - 1);
421 GNUNET_memcpy (msg->data, name, name_len); 294 GNUNET_memcpy (msg->data,
422 if (NULL != emsg) 295 name,
423 GNUNET_memcpy (msg->data + name_len + 1, emsg, strlen (emsg)); 296 name_len);
424 GST_queue_message (client, &msg->header); 297 GNUNET_memcpy (msg->data + name_len,
298 emsg,
299 err_len);
300 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
301 env);
425} 302}
426 303
427 304
@@ -433,82 +310,119 @@ send_client_status_msg (struct GNUNET_SERVER_Client *client,
433 * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR 310 * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
434 */ 311 */
435static void 312static void
436send_barrier_status_msg (struct Barrier *barrier, const char *emsg) 313send_barrier_status_msg (struct Barrier *barrier,
314 const char *emsg)
437{ 315{
438 GNUNET_assert (0 != barrier->status); 316 GNUNET_assert (0 != barrier->status);
439 send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg); 317 send_client_status_msg (barrier->mc,
318 barrier->name,
319 barrier->status,
320 emsg);
440} 321}
441 322
442 323
443/** 324/**
444 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This 325 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.
326 *
327 * @param cls identification of the client
328 * @param message the actual message
329 */
330static int
331check_barrier_wait (void *cls,
332 const struct GNUNET_TESTBED_BarrierWait *msg)
333{
334 return GNUNET_OK; /* always well-formed */
335}
336
337
338/**
339 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
445 * message should come from peers or a shared helper service using the 340 * message should come from peers or a shared helper service using the
446 * testbed-barrier client API (@see gnunet_testbed_barrier_service.h) 341 * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
447 * 342 *
448 * This handler is queued in the main service and will handle the messages sent 343 * This handler is queued in the main service and will handle the messages sent
449 * either from the testbed driver or from a high level controller 344 * either from the testbed driver or from a high level controller
450 * 345 *
451 * @param cls NULL 346 * @param cls identification of the client
452 * @param client identification of the client
453 * @param message the actual message 347 * @param message the actual message
454 */ 348 */
455static void 349static void
456handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client, 350handle_barrier_wait (void *cls,
457 const struct GNUNET_MessageHeader *message) 351 const struct GNUNET_TESTBED_BarrierWait *msg)
458{ 352{
459 const struct GNUNET_TESTBED_BarrierWait *msg; 353 struct ClientCtx *client_ctx = cls;
460 struct Barrier *barrier; 354 struct Barrier *barrier;
461 char *name; 355 char *name;
462 struct ClientCtx *client_ctx;
463 struct GNUNET_HashCode key; 356 struct GNUNET_HashCode key;
464 size_t name_len; 357 size_t name_len;
465 uint16_t msize; 358 uint16_t msize;
466 359
467 msize = ntohs (message->size); 360 msize = ntohs (msg->header.size);
468 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
469 {
470 GNUNET_break_op (0);
471 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
472 return;
473 }
474 if (NULL == barrier_map) 361 if (NULL == barrier_map)
475 { 362 {
476 GNUNET_break (0); 363 GNUNET_break (0);
477 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 364 GNUNET_SERVICE_client_drop (client_ctx->client);
478 return; 365 return;
479 } 366 }
480 msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
481 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait); 367 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
482 name = GNUNET_malloc (name_len + 1); 368 name = GNUNET_malloc (name_len + 1);
483 name[name_len] = '\0'; 369 name[name_len] = '\0';
484 GNUNET_memcpy (name, msg->name, name_len); 370 GNUNET_memcpy (name,
485 LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name); 371 msg->name,
486 GNUNET_CRYPTO_hash (name, name_len, &key); 372 name_len);
373 LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n",
374 name);
375 GNUNET_CRYPTO_hash (name,
376 name_len,
377 &key);
487 GNUNET_free (name); 378 GNUNET_free (name);
488 if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key))) 379 if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
489 { 380 {
490 GNUNET_break (0); 381 GNUNET_break (0);
491 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 382 GNUNET_SERVICE_client_drop (client_ctx->client);
492 return; 383 return;
493 } 384 }
494 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx); 385 if (NULL != client_ctx->barrier)
495 if (NULL == client_ctx)
496 { 386 {
497 client_ctx = GNUNET_new (struct ClientCtx); 387 GNUNET_break (0);
498 client_ctx->client = client; 388 GNUNET_SERVICE_client_drop (client_ctx->client);
499 GNUNET_SERVER_client_keep (client); 389 return;
500 client_ctx->barrier = barrier;
501 GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
502 GNUNET_SERVER_client_set_user_context (client, client_ctx);
503 } 390 }
391 client_ctx->barrier = barrier;
392 GNUNET_CONTAINER_DLL_insert_tail (barrier->head,
393 barrier->tail,
394 client_ctx);
504 barrier->nreached++; 395 barrier->nreached++;
505 if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) 396 if ( (barrier->num_wbarriers_reached == barrier->num_wbarriers) &&
506 && (LOCAL_QUORUM_REACHED (barrier))) 397 (LOCAL_QUORUM_REACHED (barrier)) )
507 { 398 {
508 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED; 399 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
509 send_barrier_status_msg (barrier, NULL); 400 send_barrier_status_msg (barrier,
401 NULL);
510 } 402 }
511 GNUNET_SERVER_receive_done (client, GNUNET_OK); 403 GNUNET_SERVICE_client_continue (client_ctx->client);
404}
405
406
407/**
408 * Function called when a client connects to the testbed-barrier service.
409 *
410 * @param cls NULL
411 * @param client the connecting client
412 * @param mq queue to talk to @a client
413 * @return our `struct ClientCtx`
414 */
415static void *
416connect_cb (void *cls,
417 struct GNUNET_SERVICE_Client *client,
418 struct GNUNET_MQ_Handle *mq)
419{
420 struct ClientCtx *client_ctx;
421
422 LOG_DEBUG ("Client connected to testbed-barrier service\n");
423 client_ctx = GNUNET_new (struct ClientCtx);
424 client_ctx->client = client;
425 return client_ctx;
512} 426}
513 427
514 428
@@ -521,16 +435,22 @@ handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
521 * for the last call when the server is destroyed 435 * for the last call when the server is destroyed
522 */ 436 */
523static void 437static void
524disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client) 438disconnect_cb (void *cls,
439 struct GNUNET_SERVICE_Client *client,
440 void *app_ctx)
525{ 441{
526 struct ClientCtx *client_ctx; 442 struct ClientCtx *client_ctx = app_ctx;
443 struct Barrier *barrier = client_ctx->barrier;
527 444
528 if (NULL == client) 445 if (NULL != barrier)
529 return; 446 {
530 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx); 447 GNUNET_CONTAINER_DLL_remove (barrier->head,
531 if (NULL == client_ctx) 448 barrier->tail,
532 return; 449 client_ctx);
533 cleanup_clientctx (client_ctx); 450 client_ctx->barrier = NULL;
451 }
452 GNUNET_free (client_ctx);
453 LOG_DEBUG ("Client disconnected from testbed-barrier service\n");
534} 454}
535 455
536 456
@@ -542,18 +462,23 @@ disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
542void 462void
543GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg) 463GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
544{ 464{
545 static const struct GNUNET_SERVER_MessageHandler message_handlers[] = { 465 struct GNUNET_MQ_MessageHandler message_handlers[] = {
546 {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0}, 466 GNUNET_MQ_hd_var_size (barrier_wait,
547 {NULL, NULL, 0, 0} 467 GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT,
468 struct GNUNET_TESTBED_BarrierWait,
469 NULL),
470 GNUNET_MQ_handler_end ()
548 }; 471 };
549 struct GNUNET_SERVER_Handle *srv; 472
550 473 LOG_DEBUG ("Launching testbed-barrier service\n");
551 barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES); 474 barrier_map = GNUNET_CONTAINER_multihashmap_create (3,
552 ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg, 475 GNUNET_YES);
553 GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN); 476 ctx = GNUNET_SERVICE_starT ("testbed-barrier",
554 srv = GNUNET_SERVICE_get_server (ctx); 477 cfg,
555 GNUNET_SERVER_add_handlers (srv, message_handlers); 478 &connect_cb,
556 GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL); 479 &disconnect_cb,
480 NULL,
481 message_handlers);
557} 482}
558 483
559 484
@@ -594,7 +519,7 @@ GST_barriers_destroy ()
594 NULL)); 519 NULL));
595 GNUNET_CONTAINER_multihashmap_destroy (barrier_map); 520 GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
596 GNUNET_assert (NULL != ctx); 521 GNUNET_assert (NULL != ctx);
597 GNUNET_SERVICE_stop (ctx); 522 GNUNET_SERVICE_stoP (ctx);
598} 523}
599 524
600 525
@@ -606,13 +531,14 @@ GST_barriers_destroy ()
606 * @param cls the closure given to GNUNET_TESTBED_barrier_init() 531 * @param cls the closure given to GNUNET_TESTBED_barrier_init()
607 * @param name the name of the barrier 532 * @param name the name of the barrier
608 * @param b_ the barrier handle 533 * @param b_ the barrier handle
609 * @param status status of the barrier; GNUNET_OK if the barrier is crossed; 534 * @param status status of the barrier; #GNUNET_OK if the barrier is crossed;
610 * GNUNET_SYSERR upon error 535 * #GNUNET_SYSERR upon error
611 * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the 536 * @param emsg if the status were to be #GNUNET_SYSERR, this parameter has the
612 * error messsage 537 * error messsage
613 */ 538 */
614static void 539static void
615wbarrier_status_cb (void *cls, const char *name, 540wbarrier_status_cb (void *cls,
541 const char *name,
616 struct GNUNET_TESTBED_Barrier *b_, 542 struct GNUNET_TESTBED_Barrier *b_,
617 enum GNUNET_TESTBED_BarrierStatus status, 543 enum GNUNET_TESTBED_BarrierStatus status,
618 const char *emsg) 544 const char *emsg)
@@ -622,14 +548,17 @@ wbarrier_status_cb (void *cls, const char *name,
622 548
623 GNUNET_assert (b_ == wrapper->hbarrier); 549 GNUNET_assert (b_ == wrapper->hbarrier);
624 wrapper->hbarrier = NULL; 550 wrapper->hbarrier = NULL;
625 GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper); 551 GNUNET_CONTAINER_DLL_remove (barrier->whead,
552 barrier->wtail,
553 wrapper);
626 GNUNET_free (wrapper); 554 GNUNET_free (wrapper);
627 switch (status) 555 switch (status)
628 { 556 {
629 case GNUNET_TESTBED_BARRIERSTATUS_ERROR: 557 case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
630 LOG (GNUNET_ERROR_TYPE_ERROR, 558 LOG (GNUNET_ERROR_TYPE_ERROR,
631 "Initialising barrier `%s' failed at a sub-controller: %s\n", 559 "Initialising barrier `%s' failed at a sub-controller: %s\n",
632 barrier->name, (NULL != emsg) ? emsg : "NULL"); 560 barrier->name,
561 (NULL != emsg) ? emsg : "NULL");
633 cancel_wrappers (barrier); 562 cancel_wrappers (barrier);
634 if (NULL == emsg) 563 if (NULL == emsg)
635 emsg = "Initialisation failed at a sub-controller"; 564 emsg = "Initialisation failed at a sub-controller";
@@ -686,23 +615,38 @@ fwd_tout_barrier_init (void *cls)
686} 615}
687 616
688 617
618
619/**
620 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.
621 *
622 * @param cls identification of the client
623 * @param msg the actual message
624 * @return #GNUNET_OK if @a msg is well-formed
625 */
626int
627check_barrier_init (void *cls,
628 const struct GNUNET_TESTBED_BarrierInit *msg)
629{
630 return GNUNET_OK; /* always well-formed */
631}
632
633
689/** 634/**
690 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This 635 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
691 * message should always come from a parent controller or the testbed API if we 636 * message should always come from a parent controller or the testbed API if we
692 * are the root controller. 637 * are the root controller.
693 * 638 *
694 * This handler is queued in the main service and will handle the messages sent 639 * This handler is queued in the main service and will handle the messages sent
695 * either from the testbed driver or from a high level controller 640 * either from the testbed driver or from a high level controller
696 * 641 *
697 * @param cls NULL 642 * @param cls identification of the client
698 * @param client identification of the client 643 * @param msg the actual message
699 * @param message the actual message
700 */ 644 */
701void 645void
702GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, 646handle_barrier_init (void *cls,
703 const struct GNUNET_MessageHeader *message) 647 const struct GNUNET_TESTBED_BarrierInit *msg)
704{ 648{
705 const struct GNUNET_TESTBED_BarrierInit *msg; 649 struct GNUNET_SERVICE_Client *client = cls;
706 char *name; 650 char *name;
707 struct Barrier *barrier; 651 struct Barrier *barrier;
708 struct Slave *slave; 652 struct Slave *slave;
@@ -715,49 +659,45 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
715 if (NULL == GST_context) 659 if (NULL == GST_context)
716 { 660 {
717 GNUNET_break_op (0); 661 GNUNET_break_op (0);
718 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 662 GNUNET_SERVICE_client_drop (client);
719 return; 663 return;
720 } 664 }
721 if (client != GST_context->client) 665 if (client != GST_context->client)
722 { 666 {
723 GNUNET_break_op (0); 667 GNUNET_break_op (0);
724 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 668 GNUNET_SERVICE_client_drop (client);
725 return; 669 return;
726 } 670 }
727 msize = ntohs (message->size); 671 msize = ntohs (msg->header.size);
728 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
729 {
730 GNUNET_break_op (0);
731 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
732 return;
733 }
734 msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
735 name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit); 672 name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
736 name = GNUNET_malloc (name_len + 1); 673 name = GNUNET_malloc (name_len + 1);
737 GNUNET_memcpy (name, msg->name, name_len); 674 GNUNET_memcpy (name, msg->name, name_len);
738 GNUNET_CRYPTO_hash (name, name_len, &hash); 675 GNUNET_CRYPTO_hash (name, name_len, &hash);
739 LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name); 676 LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n",
740 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash)) 677 name);
678 if (GNUNET_YES ==
679 GNUNET_CONTAINER_multihashmap_contains (barrier_map,
680 &hash))
741 { 681 {
742 682 send_client_status_msg (client,
743 send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR, 683 name,
684 GNUNET_TESTBED_BARRIERSTATUS_ERROR,
744 "A barrier with the same name already exists"); 685 "A barrier with the same name already exists");
745 GNUNET_free (name); 686 GNUNET_free (name);
746 GNUNET_SERVER_receive_done (client, GNUNET_OK); 687 GNUNET_SERVICE_client_continue (client);
747 return; 688 return;
748 } 689 }
749 barrier = GNUNET_new (struct Barrier); 690 barrier = GNUNET_new (struct Barrier);
750 GNUNET_memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode)); 691 barrier->hash = hash;
751 barrier->quorum = msg->quorum; 692 barrier->quorum = msg->quorum;
752 barrier->name = name; 693 barrier->name = name;
753 barrier->mc = client; 694 barrier->mc = client;
754 GNUNET_SERVER_client_keep (client);
755 GNUNET_assert (GNUNET_OK == 695 GNUNET_assert (GNUNET_OK ==
756 GNUNET_CONTAINER_multihashmap_put (barrier_map, 696 GNUNET_CONTAINER_multihashmap_put (barrier_map,
757 &barrier->hash, 697 &barrier->hash,
758 barrier, 698 barrier,
759 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); 699 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
760 GNUNET_SERVER_receive_done (client, GNUNET_OK); 700 GNUNET_SERVICE_client_continue (client);
761 /* Propagate barrier init to subcontrollers */ 701 /* Propagate barrier init to subcontrollers */
762 for (cnt = 0; cnt < GST_slave_list_size; cnt++) 702 for (cnt = 0; cnt < GST_slave_list_size; cnt++)
763 { 703 {
@@ -770,7 +710,9 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
770 } 710 }
771 wrapper = GNUNET_new (struct WBarrier); 711 wrapper = GNUNET_new (struct WBarrier);
772 wrapper->barrier = barrier; 712 wrapper->barrier = barrier;
773 GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper); 713 GNUNET_CONTAINER_DLL_insert_tail (barrier->whead,
714 barrier->wtail,
715 wrapper);
774 wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller, 716 wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller,
775 barrier->name, 717 barrier->name,
776 barrier->quorum, 718 barrier->quorum,
@@ -792,22 +734,36 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
792 734
793 735
794/** 736/**
795 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This 737 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.
738 *
739 * @param cls identification of the client
740 * @param msg the actual message
741 * @return #GNUNET_OK if @a msg is well-formed
742 */
743int
744check_barrier_cancel (void *cls,
745 const struct GNUNET_TESTBED_BarrierCancel *msg)
746{
747 return GNUNET_OK; /* all are well-formed */
748}
749
750
751/**
752 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
796 * message should always come from a parent controller or the testbed API if we 753 * message should always come from a parent controller or the testbed API if we
797 * are the root controller. 754 * are the root controller.
798 * 755 *
799 * This handler is queued in the main service and will handle the messages sent 756 * This handler is queued in the main service and will handle the messages sent
800 * either from the testbed driver or from a high level controller 757 * either from the testbed driver or from a high level controller
801 * 758 *
802 * @param cls NULL 759 * @param cls identification of the client
803 * @param client identification of the client 760 * @param msg the actual message
804 * @param message the actual message
805 */ 761 */
806void 762void
807GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client, 763handle_barrier_cancel (void *cls,
808 const struct GNUNET_MessageHeader *message) 764 const struct GNUNET_TESTBED_BarrierCancel *msg)
809{ 765{
810 const struct GNUNET_TESTBED_BarrierCancel *msg; 766 struct GNUNET_SERVICE_Client *client = cls;
811 char *name; 767 char *name;
812 struct Barrier *barrier; 768 struct Barrier *barrier;
813 struct GNUNET_HashCode hash; 769 struct GNUNET_HashCode hash;
@@ -817,119 +773,140 @@ GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
817 if (NULL == GST_context) 773 if (NULL == GST_context)
818 { 774 {
819 GNUNET_break_op (0); 775 GNUNET_break_op (0);
820 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 776 GNUNET_SERVICE_client_drop (client);
821 return; 777 return;
822 } 778 }
823 if (client != GST_context->client) 779 if (client != GST_context->client)
824 { 780 {
825 GNUNET_break_op (0); 781 GNUNET_break_op (0);
826 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 782 GNUNET_SERVICE_client_drop (client);
827 return; 783 return;
828 } 784 }
829 msize = ntohs (message->size); 785 msize = ntohs (msg->header.size);
830 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
831 {
832 GNUNET_break_op (0);
833 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
834 return;
835 }
836 msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
837 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel); 786 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
838 name = GNUNET_malloc (name_len + 1); 787 name = GNUNET_malloc (name_len + 1);
839 GNUNET_memcpy (name, msg->name, name_len); 788 GNUNET_memcpy (name,
840 GNUNET_CRYPTO_hash (name, name_len, &hash); 789 msg->name,
841 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash)) 790 name_len);
791 LOG_DEBUG ("Received BARRIER_CANCEL for barrier `%s'\n",
792 name);
793 GNUNET_CRYPTO_hash (name,
794 name_len,
795 &hash);
796 if (GNUNET_NO ==
797 GNUNET_CONTAINER_multihashmap_contains (barrier_map,
798 &hash))
842 { 799 {
843 GNUNET_break_op (0); 800 GNUNET_break_op (0);
844 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 801 GNUNET_SERVICE_client_drop (client);
845 return; 802 return;
846 } 803 }
847 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash); 804 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
805 &hash);
848 GNUNET_assert (NULL != barrier); 806 GNUNET_assert (NULL != barrier);
849 cancel_wrappers (barrier); 807 cancel_wrappers (barrier);
850 remove_barrier (barrier); 808 remove_barrier (barrier);
851 GNUNET_SERVER_receive_done (client, GNUNET_OK); 809 GNUNET_SERVICE_client_continue (client);
852} 810}
853 811
854 812
855/** 813/**
856 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages. 814 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
815 *
816 * @param cls identification of the client
817 * @param msg the actual message
818 * @return #GNUNET_OK if @a msg is well-formed
819 */
820int
821check_barrier_status (void *cls,
822 const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
823{
824 uint16_t msize;
825 uint16_t name_len;
826 const char *name;
827 enum GNUNET_TESTBED_BarrierStatus status;
828
829 msize = ntohs (msg->header.size) - sizeof (*msg);
830 status = ntohs (msg->status);
831 if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
832 {
833 GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED
834 status message this way */
835 return GNUNET_SYSERR;
836 }
837 name = msg->data;
838 name_len = ntohs (msg->name_len);
839 if ((name_len + 1) != msize)
840 {
841 GNUNET_break_op (0);
842 return GNUNET_SYSERR;
843 }
844 if ('\0' != name[name_len])
845 {
846 GNUNET_break_op (0);
847 return GNUNET_SYSERR;
848 }
849 return GNUNET_OK;
850}
851
852
853/**
854 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
857 * This handler is queued in the main service and will handle the messages sent 855 * This handler is queued in the main service and will handle the messages sent
858 * either from the testbed driver or from a high level controller 856 * either from the testbed driver or from a high level controller
859 * 857 *
860 * @param cls NULL 858 * @param cls identification of the client
861 * @param client identification of the client 859 * @param msg the actual message
862 * @param message the actual message
863 */ 860 */
864void 861void
865GST_handle_barrier_status (void *cls, 862handle_barrier_status (void *cls,
866 struct GNUNET_SERVER_Client *client, 863 const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
867 const struct GNUNET_MessageHeader *message)
868{ 864{
869 const struct GNUNET_TESTBED_BarrierStatusMsg *msg; 865 struct GNUNET_SERVICE_Client *client = cls;
870 struct Barrier *barrier; 866 struct Barrier *barrier;
871 struct ClientCtx *client_ctx; 867 struct ClientCtx *client_ctx;
872 const char *name; 868 const char *name;
873 struct GNUNET_HashCode key; 869 struct GNUNET_HashCode key;
874 enum GNUNET_TESTBED_BarrierStatus status;
875 uint16_t msize;
876 uint16_t name_len; 870 uint16_t name_len;
871 struct GNUNET_MQ_Envelope *env;
877 872
878 if (NULL == GST_context) 873 if (NULL == GST_context)
879 { 874 {
880 GNUNET_break_op (0); 875 GNUNET_break_op (0);
881 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 876 GNUNET_SERVICE_client_drop (client);
882 return; 877 return;
883 } 878 }
884 if (client != GST_context->client) 879 if (client != GST_context->client)
885 { 880 {
886 GNUNET_break_op (0); 881 GNUNET_break_op (0);
887 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 882 GNUNET_SERVICE_client_drop (client);
888 return;
889 }
890 msize = ntohs (message->size);
891 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
892 {
893 GNUNET_break_op (0);
894 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
895 return;
896 }
897 msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
898 status = ntohs (msg->status);
899 if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
900 {
901 GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED
902 status message this way */
903 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
904 return; 883 return;
905 } 884 }
906 name = msg->data; 885 name = msg->data;
907 name_len = ntohs (msg->name_len); 886 name_len = ntohs (msg->name_len);
908 if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize) 887 LOG_DEBUG ("Received BARRIER_STATUS for barrier `%s'\n",
909 { 888 name);
910 GNUNET_break_op (0); 889 GNUNET_CRYPTO_hash (name,
911 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 890 name_len,
912 return; 891 &key);
913 } 892 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
914 if ('\0' != name[name_len]) 893 &key);
915 {
916 GNUNET_break_op (0);
917 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
918 return;
919 }
920 GNUNET_CRYPTO_hash (name, name_len, &key);
921 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
922 if (NULL == barrier) 894 if (NULL == barrier)
923 { 895 {
924 GNUNET_break_op (0); 896 GNUNET_break_op (0);
925 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 897 GNUNET_SERVICE_client_drop (client);
926 return; 898 return;
927 } 899 }
928 GNUNET_SERVER_receive_done (client, GNUNET_OK); 900 GNUNET_SERVICE_client_continue (client);
929 while (NULL != (client_ctx = barrier->head)) /* Notify peers */ 901 while (NULL != (client_ctx = barrier->head)) /* Notify peers */
930 { 902 {
931 queue_message (client_ctx, GNUNET_copy_message (message)); 903 env = GNUNET_MQ_msg_copy (&msg->header);
932 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx); 904 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
905 env);
906 GNUNET_CONTAINER_DLL_remove (barrier->head,
907 barrier->tail,
908 client_ctx);
909 client_ctx->barrier = NULL;
933 } 910 }
934} 911}
935 912