/* This file is part of GNUnet. Copyright (C) 2008--2016 GNUnet e.V. GNUnet is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . SPDX-License-Identifier: AGPL3.0-or-later */ /** * @file testbed/gnunet-service-testbed_barriers.c * @brief barrier handling at the testbed controller * @author Sree Harsha Totakura */ #include "gnunet-service-testbed.h" #include "gnunet-service-testbed_barriers.h" #include "testbed_api.h" /** * timeout for outgoing message transmissions in seconds */ #define MESSAGE_SEND_TIMEOUT(s) \ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s) /** * Test to see if local peers have reached the required quorum of a barrier */ #define LOCAL_QUORUM_REACHED(barrier) \ ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100)) #ifdef LOG #undef LOG #endif /** * Logging shorthand */ #define LOG(kind, ...) \ GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__) /** * Barrier */ struct Barrier; /** * Context to be associated with each client */ struct ClientCtx { /** * The barrier this client is waiting for */ struct Barrier *barrier; /** * DLL next ptr */ struct ClientCtx *next; /** * DLL prev ptr */ struct ClientCtx *prev; /** * The client handle */ struct GNUNET_SERVICE_Client *client; }; /** * Wrapper around Barrier handle */ struct WBarrier { /** * DLL next pointer */ struct WBarrier *next; /** * DLL prev pointer */ struct WBarrier *prev; /** * The local barrier associated with the creation of this wrapper */ struct Barrier *barrier; /** * Handle to the slave controller where this wrapper creates a barrier */ struct GNUNET_TESTBED_Controller *controller; /** * The barrier handle from API */ struct GNUNET_TESTBED_Barrier *hbarrier; /** * Has this barrier been crossed? */ uint8_t reached; }; /** * Barrier */ struct Barrier { /** * The hashcode of the barrier name */ struct GNUNET_HashCode hash; /** * The client handle to the master controller */ struct GNUNET_SERVICE_Client *mc; /** * The name of the barrier */ char *name; /** * DLL head for the list of clients waiting for this barrier */ struct ClientCtx *head; /** * DLL tail for the list of clients waiting for this barrier */ struct ClientCtx *tail; /** * DLL head for the list of barrier handles */ struct WBarrier *whead; /** * DLL tail for the list of barrier handles */ struct WBarrier *wtail; /** * Identifier for the timeout task */ struct GNUNET_SCHEDULER_Task *tout_task; /** * The status of this barrier */ enum GNUNET_TESTBED_BarrierStatus status; /** * Number of barriers wrapped in the above DLL */ unsigned int num_wbarriers; /** * Number of wrapped barriers reached so far */ unsigned int num_wbarriers_reached; /** * Number of wrapped barrier initialised so far */ unsigned int num_wbarriers_inited; /** * Number of peers which have reached this barrier */ unsigned int nreached; /** * Number of slaves we have initialised this barrier */ unsigned int nslaves; /** * Quorum percentage to be reached */ uint8_t quorum; }; /** * Hashtable handle for storing initialised barriers */ static struct GNUNET_CONTAINER_MultiHashMap *barrier_map; /** * Service context */ static struct GNUNET_SERVICE_Handle *ctx; /** * Function to remove a barrier from the barrier map and cleanup resources * occupied by a barrier * * @param barrier the barrier handle */ static void remove_barrier (struct Barrier *barrier) { struct ClientCtx *ctx; GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map, &barrier->hash, barrier)); while (NULL != (ctx = barrier->head)) { GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx); ctx->barrier = NULL; } GNUNET_free (barrier->name); GNUNET_free (barrier); } /** * Cancels all subcontroller barrier handles * * @param barrier the local barrier */ static void cancel_wrappers (struct Barrier *barrier) { struct WBarrier *wrapper; while (NULL != (wrapper = barrier->whead)) { GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier); GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper); GNUNET_free (wrapper); } } /** * Send a status message about a barrier to the given client * * @param client the client to send the message to * @param name the barrier name * @param status the status of the barrier * @param emsg the error message; should be non-NULL for * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR */ static void send_client_status_msg (struct GNUNET_SERVICE_Client *client, const char *name, enum GNUNET_TESTBED_BarrierStatus status, const char *emsg) { struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_BarrierStatusMsg *msg; size_t name_len; size_t err_len; GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status)); name_len = strlen (name) + 1; err_len = ((NULL == emsg) ? 0 : (strlen (emsg) + 1)); env = GNUNET_MQ_msg_extra (msg, name_len + err_len, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS); msg->status = htons (status); msg->name_len = htons ((uint16_t) name_len - 1); GNUNET_memcpy (msg->data, name, name_len); GNUNET_memcpy (msg->data + name_len, emsg, err_len); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); } /** * Sends a barrier failed message * * @param barrier the corresponding barrier * @param emsg the error message; should be non-NULL for * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR */ static void send_barrier_status_msg (struct Barrier *barrier, const char *emsg) { GNUNET_assert (0 != barrier->status); send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg); } /** * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. * * @param cls identification of the client * @param message the actual message */ static int check_barrier_wait (void *cls, const struct GNUNET_TESTBED_BarrierWait *msg) { return GNUNET_OK; /* always well-formed */ } /** * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This * message should come from peers or a shared helper service using the * testbed-barrier client API (@see gnunet_testbed_barrier_service.h) * * This handler is queued in the main service and will handle the messages sent * either from the testbed driver or from a high level controller * * @param cls identification of the client * @param message the actual message */ static void handle_barrier_wait (void *cls, const struct GNUNET_TESTBED_BarrierWait *msg) { struct ClientCtx *client_ctx = cls; struct Barrier *barrier; char *name; struct GNUNET_HashCode key; size_t name_len; uint16_t msize; msize = ntohs (msg->header.size); if (NULL == barrier_map) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client_ctx->client); return; } name_len = msize - sizeof(struct GNUNET_TESTBED_BarrierWait); name = GNUNET_malloc (name_len + 1); name[name_len] = '\0'; GNUNET_memcpy (name, msg->name, name_len); LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name); GNUNET_CRYPTO_hash (name, name_len, &key); GNUNET_free (name); if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key))) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client_ctx->client); return; } if (NULL != client_ctx->barrier) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client_ctx->client); return; } client_ctx->barrier = barrier; GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx); barrier->nreached++; if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) && (LOCAL_QUORUM_REACHED (barrier))) { barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED; send_barrier_status_msg (barrier, NULL); } GNUNET_SERVICE_client_continue (client_ctx->client); } /** * Function called when a client connects to the testbed-barrier service. * * @param cls NULL * @param client the connecting client * @param mq queue to talk to @a client * @return our `struct ClientCtx` */ static void * connect_cb (void *cls, struct GNUNET_SERVICE_Client *client, struct GNUNET_MQ_Handle *mq) { struct ClientCtx *client_ctx; LOG_DEBUG ("Client connected to testbed-barrier service\n"); client_ctx = GNUNET_new (struct ClientCtx); client_ctx->client = client; return client_ctx; } /** * Functions with this signature are called whenever a client * is disconnected on the network level. * * @param cls closure * @param client identification of the client; NULL * for the last call when the server is destroyed */ static void disconnect_cb (void *cls, struct GNUNET_SERVICE_Client *client, void *app_ctx) { struct ClientCtx *client_ctx = app_ctx; struct Barrier *barrier = client_ctx->barrier; if (NULL != barrier) { GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx); client_ctx->barrier = NULL; } GNUNET_free (client_ctx); LOG_DEBUG ("Client disconnected from testbed-barrier service\n"); } /** * Function to initialise barrriers component * * @param cfg the configuration to use for initialisation */ void GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg) { struct GNUNET_MQ_MessageHandler message_handlers[] = { GNUNET_MQ_hd_var_size (barrier_wait, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, struct GNUNET_TESTBED_BarrierWait, NULL), GNUNET_MQ_handler_end () }; LOG_DEBUG ("Launching testbed-barrier service\n"); barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES); ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg, &connect_cb, &disconnect_cb, NULL, message_handlers); } /** * Iterator over hash map entries. * * @param cls closure * @param key current key code * @param value value in the hash map * @return #GNUNET_YES if we should continue to * iterate, * #GNUNET_NO if not. */ static int barrier_destroy_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) { struct Barrier *barrier = value; GNUNET_assert (NULL != barrier); cancel_wrappers (barrier); remove_barrier (barrier); return GNUNET_YES; } /** * Function to stop the barrier service */ void GST_barriers_destroy () { GNUNET_assert (NULL != barrier_map); GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_iterate (barrier_map, & barrier_destroy_iterator, NULL)); GNUNET_CONTAINER_multihashmap_destroy (barrier_map); GNUNET_assert (NULL != ctx); GNUNET_SERVICE_stop (ctx); } /** * Functions of this type are to be given as callback argument to * GNUNET_TESTBED_barrier_init(). The callback will be called when status * information is available for the barrier. * * @param cls the closure given to GNUNET_TESTBED_barrier_init() * @param name the name of the barrier * @param b_ the barrier handle * @param status status of the barrier; #GNUNET_OK if the barrier is crossed; * #GNUNET_SYSERR upon error * @param emsg if the status were to be #GNUNET_SYSERR, this parameter has the * error messsage */ static void wbarrier_status_cb (void *cls, const char *name, struct GNUNET_TESTBED_Barrier *b_, enum GNUNET_TESTBED_BarrierStatus status, const char *emsg) { struct WBarrier *wrapper = cls; struct Barrier *barrier = wrapper->barrier; GNUNET_assert (b_ == wrapper->hbarrier); switch (status) { case GNUNET_TESTBED_BARRIERSTATUS_ERROR: LOG (GNUNET_ERROR_TYPE_ERROR, "Initialising barrier `%s' failed at a sub-controller: %s\n", barrier->name, (NULL != emsg) ? emsg : "NULL"); cancel_wrappers (barrier); if (NULL == emsg) emsg = "Initialisation failed at a sub-controller"; barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR; send_barrier_status_msg (barrier, emsg); return; case GNUNET_TESTBED_BARRIERSTATUS_CROSSED: if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status) { GNUNET_break_op (0); return; } barrier->num_wbarriers_reached++; if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) && (LOCAL_QUORUM_REACHED (barrier))) { barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED; send_barrier_status_msg (barrier, NULL); } return; case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED: if (0 != barrier->status) { GNUNET_break_op (0); return; } barrier->num_wbarriers_inited++; if (barrier->num_wbarriers_inited == barrier->num_wbarriers) { barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED; send_barrier_status_msg (barrier, NULL); } return; } } /** * Function called upon timeout while waiting for a response from the * subcontrollers to barrier init message * * @param cls barrier */ static void fwd_tout_barrier_init (void *cls) { struct Barrier *barrier = cls; cancel_wrappers (barrier); barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR; send_barrier_status_msg (barrier, "Timedout while propagating barrier initialisation\n"); remove_barrier (barrier); } /** * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. * * @param cls identification of the client * @param msg the actual message * @return #GNUNET_OK if @a msg is well-formed */ int check_barrier_init (void *cls, const struct GNUNET_TESTBED_BarrierInit *msg) { return GNUNET_OK; /* always well-formed */ } /** * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This * message should always come from a parent controller or the testbed API if we * are the root controller. * * This handler is queued in the main service and will handle the messages sent * either from the testbed driver or from a high level controller * * @param cls identification of the client * @param msg the actual message */ void handle_barrier_init (void *cls, const struct GNUNET_TESTBED_BarrierInit *msg) { struct GNUNET_SERVICE_Client *client = cls; char *name; struct Barrier *barrier; struct Slave *slave; struct WBarrier *wrapper; struct GNUNET_HashCode hash; size_t name_len; unsigned int cnt; uint16_t msize; if (NULL == GST_context) { GNUNET_break_op (0); GNUNET_SERVICE_client_drop (client); return; } if (client != GST_context->client) { GNUNET_break_op (0); GNUNET_SERVICE_client_drop (client); return; } msize = ntohs (msg->header.size); name_len = (size_t) msize - sizeof(struct GNUNET_TESTBED_BarrierInit); name = GNUNET_malloc (name_len + 1); GNUNET_memcpy (name, msg->name, name_len); GNUNET_CRYPTO_hash (name, name_len, &hash); LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name); if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash)) { send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR, "A barrier with the same name already exists"); GNUNET_free (name); GNUNET_SERVICE_client_continue (client); return; } barrier = GNUNET_new (struct Barrier); barrier->hash = hash; barrier->quorum = msg->quorum; barrier->name = name; barrier->mc = client; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (barrier_map, &barrier->hash, barrier, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); GNUNET_SERVICE_client_continue (client); /* Propagate barrier init to subcontrollers */ for (cnt = 0; cnt < GST_slave_list_size; cnt++) { if (NULL == (slave = GST_slave_list[cnt])) continue; if (NULL == slave->controller) { GNUNET_break (0); /* May happen when we are connecting to the controller */ continue; } wrapper = GNUNET_new (struct WBarrier); wrapper->barrier = barrier; wrapper->controller = slave->controller; GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper); barrier->num_wbarriers++; wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (wrapper->controller, barrier->name, barrier->quorum, &wbarrier_status_cb, wrapper, GNUNET_NO); } if (NULL == barrier->whead) /* No further propagation */ { barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED; LOG_DEBUG ( "Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n", barrier->name); send_barrier_status_msg (barrier, NULL); } else barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT ( 30), &fwd_tout_barrier_init, barrier); } /** * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. * * @param cls identification of the client * @param msg the actual message * @return #GNUNET_OK if @a msg is well-formed */ int check_barrier_cancel (void *cls, const struct GNUNET_TESTBED_BarrierCancel *msg) { return GNUNET_OK; /* all are well-formed */ } /** * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This * message should always come from a parent controller or the testbed API if we * are the root controller. * * This handler is queued in the main service and will handle the messages sent * either from the testbed driver or from a high level controller * * @param cls identification of the client * @param msg the actual message */ void handle_barrier_cancel (void *cls, const struct GNUNET_TESTBED_BarrierCancel *msg) { struct GNUNET_SERVICE_Client *client = cls; char *name; struct Barrier *barrier; struct GNUNET_HashCode hash; size_t name_len; uint16_t msize; if (NULL == GST_context) { GNUNET_break_op (0); GNUNET_SERVICE_client_drop (client); return; } if (client != GST_context->client) { GNUNET_break_op (0); GNUNET_SERVICE_client_drop (client); return; } msize = ntohs (msg->header.size); name_len = msize - sizeof(struct GNUNET_TESTBED_BarrierCancel); name = GNUNET_malloc (name_len + 1); GNUNET_memcpy (name, msg->name, name_len); LOG_DEBUG ("Received BARRIER_CANCEL for barrier `%s'\n", name); GNUNET_CRYPTO_hash (name, name_len, &hash); if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash)) { GNUNET_break_op (0); GNUNET_SERVICE_client_drop (client); return; } barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash); GNUNET_assert (NULL != barrier); cancel_wrappers (barrier); remove_barrier (barrier); GNUNET_SERVICE_client_continue (client); } /** * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages. * * @param cls identification of the client * @param msg the actual message * @return #GNUNET_OK if @a msg is well-formed */ int check_barrier_status (void *cls, const struct GNUNET_TESTBED_BarrierStatusMsg *msg) { uint16_t msize; uint16_t name_len; const char *name; enum GNUNET_TESTBED_BarrierStatus status; msize = ntohs (msg->header.size) - sizeof(*msg); status = ntohs (msg->status); if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status) { GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED status message this way */ return GNUNET_SYSERR; } name = msg->data; name_len = ntohs (msg->name_len); if ((name_len + 1) != msize) { GNUNET_break_op (0); return GNUNET_SYSERR; } if ('\0' != name[name_len]) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages. * This handler is queued in the main service and will handle the messages sent * either from the testbed driver or from a high level controller * * @param cls identification of the client * @param msg the actual message */ void handle_barrier_status (void *cls, const struct GNUNET_TESTBED_BarrierStatusMsg *msg) { struct GNUNET_SERVICE_Client *client = cls; struct Barrier *barrier; struct ClientCtx *client_ctx; struct WBarrier *wrapper; const char *name; struct GNUNET_HashCode key; uint16_t name_len; struct GNUNET_MQ_Envelope *env; if (NULL == GST_context) { GNUNET_break_op (0); GNUNET_SERVICE_client_drop (client); return; } if (client != GST_context->client) { GNUNET_break_op (0); GNUNET_SERVICE_client_drop (client); return; } name = msg->data; name_len = ntohs (msg->name_len); LOG_DEBUG ("Received BARRIER_STATUS for barrier `%s'\n", name); GNUNET_CRYPTO_hash (name, name_len, &key); barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key); if (NULL == barrier) { GNUNET_break_op (0); GNUNET_SERVICE_client_drop (client); return; } GNUNET_SERVICE_client_continue (client); for (client_ctx = barrier->head; NULL != client_ctx; client_ctx = client_ctx->next) /* Notify peers */ { env = GNUNET_MQ_msg_copy (&msg->header); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client_ctx->client), env); } /** * The wrapper barriers do not echo the barrier status, so we have to do it * here */ for (wrapper = barrier->whead; NULL != wrapper; wrapper = wrapper->next) { GNUNET_TESTBED_queue_message_ (wrapper->controller, GNUNET_copy_message (&msg->header)); } } /* end of gnunet-service-testbed_barriers.c */