From 6f3ff8ff3db35d2a5c02e8cc88a912e9e0106d7c Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Mon, 8 Apr 2013 09:31:53 +0000 Subject: - restructure --- src/testbed/Makefile.am | 3 +- src/testbed/gnunet-service-testbed.c | 1112 +-------------------------- src/testbed/gnunet-service-testbed.h | 159 ++++ src/testbed/gnunet-service-testbed_oc.c | 9 +- src/testbed/gnunet-service-testbed_peers.c | 1138 ++++++++++++++++++++++++++++ 5 files changed, 1322 insertions(+), 1099 deletions(-) create mode 100644 src/testbed/gnunet-service-testbed_peers.c diff --git a/src/testbed/Makefile.am b/src/testbed/Makefile.am index d980e8b6e..97e304ff3 100644 --- a/src/testbed/Makefile.am +++ b/src/testbed/Makefile.am @@ -30,8 +30,9 @@ bin_PROGRAMS = \ gnunet-testbed-profiler gnunet_service_testbed_SOURCES = \ - gnunet-service-testbed.c \ gnunet-service-testbed.h \ + gnunet-service-testbed.c \ + gnunet-service-testbed_peers.c \ gnunet-service-testbed_cache.c \ gnunet-service-testbed_oc.c \ gnunet-service-testbed_cpustatus.c diff --git a/src/testbed/gnunet-service-testbed.c b/src/testbed/gnunet-service-testbed.c index a1c7f02be..05668284b 100644 --- a/src/testbed/gnunet-service-testbed.c +++ b/src/testbed/gnunet-service-testbed.c @@ -25,28 +25,6 @@ */ #include "gnunet-service-testbed.h" -#include "gnunet_arm_service.h" - -#include - - -/** - * Context data for GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS handler - */ -struct HandlerContext_ShutdownPeers -{ - /** - * The number of slave we expect to hear from since we forwarded the - * GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS message to them - */ - unsigned int nslaves; - - /** - * Did we observe a timeout with respect to this operation at any of the - * slaves - */ - int timeout; -}; /***********/ @@ -68,11 +46,6 @@ struct Context *GST_context; */ struct Slave **GST_slave_list; -/** - * A list of peers we know about - */ -struct Peer **GST_peer_list; - /** * Array of hosts */ @@ -276,30 +249,6 @@ GST_queue_message (struct GNUNET_SERVER_Client *client, } -/** - * Similar to GNUNET_array_grow(); however instead of calling GNUNET_array_grow() - * several times we call it only once. The array is also made to grow in steps - * of LIST_GROW_STEP. - * - * @param ptr the array pointer to grow - * @param size the size of array - * @param accommodate_size the size which the array has to accommdate; after - * this call the array will be big enough to accommdate sizes upto - * accommodate_size - */ -#define array_grow_large_enough(ptr, size, accommodate_size) \ - do \ - { \ - unsigned int growth_size; \ - GNUNET_assert (size <= accommodate_size); \ - growth_size = size; \ - while (growth_size <= accommodate_size) \ - growth_size += LIST_GROW_STEP; \ - GNUNET_array_grow (ptr, size, growth_size); \ - GNUNET_assert (size > accommodate_size); \ - } while (0) - - /** * Function to add a host to the current list of known hosts * @@ -314,7 +263,7 @@ host_list_add (struct GNUNET_TESTBED_Host *host) host_id = GNUNET_TESTBED_host_get_id_ (host); if (GST_host_list_size <= host_id) - array_grow_large_enough (GST_host_list, GST_host_list_size, host_id); + GST_array_grow_large_enough (GST_host_list, GST_host_list_size, host_id); if (NULL != GST_host_list[host_id]) { LOG_DEBUG ("A host with id: %u already exists\n", host_id); @@ -334,7 +283,7 @@ static void route_list_add (struct Route *route) { if (route->dest >= route_list_size) - array_grow_large_enough (route_list, route_list_size, route->dest); + GST_array_grow_large_enough (route_list, route_list_size, route->dest); GNUNET_assert (NULL == route_list[route->dest]); route_list[route->dest] = route; } @@ -349,60 +298,13 @@ static void slave_list_add (struct Slave *slave) { if (slave->host_id >= GST_slave_list_size) - array_grow_large_enough (GST_slave_list, GST_slave_list_size, - slave->host_id); + GST_array_grow_large_enough (GST_slave_list, GST_slave_list_size, + slave->host_id); GNUNET_assert (NULL == GST_slave_list[slave->host_id]); GST_slave_list[slave->host_id] = slave; } -/** - * Adds a peer to the peer array - * - * @param peer the peer to add - */ -static void -peer_list_add (struct Peer *peer) -{ - if (peer->id >= GST_peer_list_size) - array_grow_large_enough (GST_peer_list, GST_peer_list_size, peer->id); - GNUNET_assert (NULL == GST_peer_list[peer->id]); - GST_peer_list[peer->id] = peer; -} - - -/** - * Removes a the give peer from the peer array - * - * @param peer the peer to be removed - */ -static void -peer_list_remove (struct Peer *peer) -{ - unsigned int orig_size; - uint32_t id; - - GST_peer_list[peer->id] = NULL; - orig_size = GST_peer_list_size; - while (GST_peer_list_size >= LIST_GROW_STEP) - { - for (id = GST_peer_list_size - 1; - (id >= GST_peer_list_size - LIST_GROW_STEP) && (id != UINT32_MAX); - id--) - if (NULL != GST_peer_list[id]) - break; - if (id != ((GST_peer_list_size - LIST_GROW_STEP) - 1)) - break; - GST_peer_list_size -= LIST_GROW_STEP; - } - if (orig_size == GST_peer_list_size) - return; - GST_peer_list = - GNUNET_realloc (GST_peer_list, - sizeof (struct Peer *) * GST_peer_list_size); -} - - /** * Finds the route with directly connected host as destination through which * the destination host can be reached @@ -476,9 +378,9 @@ GST_send_operation_fail_msg (struct GNUNET_SERVER_Client *client, * @param client the client to send the message to * @param operation_id the id of the operation which was successful */ -static void -send_operation_success_msg (struct GNUNET_SERVER_Client *client, - uint64_t operation_id) +void +GST_send_operation_success_msg (struct GNUNET_SERVER_Client *client, + uint64_t operation_id) { struct GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg; uint16_t msize; @@ -1414,530 +1316,6 @@ handle_link_controllers (void *cls, struct GNUNET_SERVER_Client *client, } -/** - * The task to be executed if the forwarded peer create operation has been - * timed out - * - * @param cls the FowardedOperationContext - * @param tc the TaskContext from the scheduler - */ -static void -peer_create_forward_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct ForwardedOperationContext *fopc = cls; - - GNUNET_free (fopc->cls); - GST_forwarded_operation_timeout (fopc, tc); -} - - -/** - * Callback to be called when forwarded peer create operation is successfull. We - * have to relay the reply msg back to the client - * - * @param cls ForwardedOperationContext - * @param msg the peer create success message - */ -static void -peer_create_success_cb (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct ForwardedOperationContext *fopc = cls; - struct Peer *remote_peer; - - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS) - { - GNUNET_assert (NULL != fopc->cls); - remote_peer = fopc->cls; - peer_list_add (remote_peer); - } - GST_forwarded_operation_reply_relay (fopc, msg); -} - - -/** - * Function to destroy a peer - * - * @param peer the peer structure to destroy - */ -void -GST_destroy_peer (struct Peer *peer) -{ - GNUNET_break (0 == peer->reference_cnt); - if (GNUNET_YES == peer->is_remote) - { - peer_list_remove (peer); - GNUNET_free (peer); - return; - } - if (GNUNET_YES == peer->details.local.is_running) - { - GNUNET_TESTING_peer_stop (peer->details.local.peer); - peer->details.local.is_running = GNUNET_NO; - } - GNUNET_TESTING_peer_destroy (peer->details.local.peer); - GNUNET_CONFIGURATION_destroy (peer->details.local.cfg); - peer_list_remove (peer); - GNUNET_free (peer); -} - - -/** - * Callback to be called when forwarded peer destroy operation is successfull. We - * have to relay the reply msg back to the client - * - * @param cls ForwardedOperationContext - * @param msg the peer create success message - */ -static void -peer_destroy_success_cb (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct ForwardedOperationContext *fopc = cls; - struct Peer *remote_peer; - - if (GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS == - ntohs (msg->type)) - { - remote_peer = fopc->cls; - GNUNET_assert (NULL != remote_peer); - remote_peer->destroy_flag = GNUNET_YES; - if (0 == remote_peer->reference_cnt) - GST_destroy_peer (remote_peer); - } - GST_forwarded_operation_reply_relay (fopc, msg); -} - - - -/** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_CREATEPEER messages - * - * @param cls NULL - * @param client identification of the client - * @param message the actual message - */ -static void -handle_peer_create (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct GNUNET_TESTBED_PeerCreateMessage *msg; - struct GNUNET_TESTBED_PeerCreateSuccessEventMessage *reply; - struct GNUNET_CONFIGURATION_Handle *cfg; - struct ForwardedOperationContext *fo_ctxt; - struct Route *route; - struct Peer *peer; - char *config; - size_t dest_size; - int ret; - uint32_t config_size; - uint32_t host_id; - uint32_t peer_id; - uint16_t msize; - - - msize = ntohs (message->size); - if (msize <= sizeof (struct GNUNET_TESTBED_PeerCreateMessage)) - { - GNUNET_break (0); /* We need configuration */ - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - msg = (const struct GNUNET_TESTBED_PeerCreateMessage *) message; - host_id = ntohl (msg->host_id); - peer_id = ntohl (msg->peer_id); - if (UINT32_MAX == peer_id) - { - GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), - "Cannot create peer with given ID"); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - if (host_id == GST_context->host_id) - { - char *emsg; - - /* We are responsible for this peer */ - msize -= sizeof (struct GNUNET_TESTBED_PeerCreateMessage); - config_size = ntohl (msg->config_size); - config = GNUNET_malloc (config_size); - dest_size = config_size; - if (Z_OK != - (ret = - uncompress ((Bytef *) config, (uLongf *) & dest_size, - (const Bytef *) &msg[1], (uLong) msize))) - { - GNUNET_break (0); /* uncompression error */ - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - if (config_size != dest_size) - { - GNUNET_break (0); /* Uncompressed config size mismatch */ - GNUNET_free (config); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - cfg = GNUNET_CONFIGURATION_create (); - if (GNUNET_OK != - GNUNET_CONFIGURATION_deserialize (cfg, config, config_size, GNUNET_NO)) - { - GNUNET_break (0); /* Configuration parsing error */ - GNUNET_free (config); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - GNUNET_free (config); - GNUNET_CONFIGURATION_set_value_number (cfg, "TESTBED", "PEERID", - (unsigned long long) peer_id); - peer = GNUNET_malloc (sizeof (struct Peer)); - peer->is_remote = GNUNET_NO; - peer->details.local.cfg = cfg; - peer->id = peer_id; - LOG_DEBUG ("Creating peer with id: %u\n", (unsigned int) peer->id); - peer->details.local.peer = - GNUNET_TESTING_peer_configure (GST_context->system, - peer->details.local.cfg, peer->id, - NULL /* Peer id */ , - &emsg); - if (NULL == peer->details.local.peer) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Configuring peer failed: %s\n", emsg); - GNUNET_free (emsg); - GNUNET_free (peer); - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - peer->details.local.is_running = GNUNET_NO; - peer_list_add (peer); - reply = - GNUNET_malloc (sizeof - (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage)); - reply->header.size = - htons (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage)); - reply->header.type = - htons (GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS); - reply->peer_id = msg->peer_id; - reply->operation_id = msg->operation_id; - GST_queue_message (client, &reply->header); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - - /* Forward peer create request */ - route = GST_find_dest_route (host_id); - if (NULL == route) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - - peer = GNUNET_malloc (sizeof (struct Peer)); - peer->is_remote = GNUNET_YES; - peer->id = peer_id; - peer->details.remote.slave = GST_slave_list[route->dest]; - peer->details.remote.remote_host_id = host_id; - fo_ctxt = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); - GNUNET_SERVER_client_keep (client); - fo_ctxt->client = client; - fo_ctxt->operation_id = GNUNET_ntohll (msg->operation_id); - fo_ctxt->cls = peer; //GST_slave_list[route->dest]->controller; - fo_ctxt->type = OP_PEER_CREATE; - fo_ctxt->opc = - GNUNET_TESTBED_forward_operation_msg_ (GST_slave_list - [route->dest]->controller, - fo_ctxt->operation_id, - &msg->header, - peer_create_success_cb, fo_ctxt); - fo_ctxt->timeout_task = - GNUNET_SCHEDULER_add_delayed (GST_timeout, &peer_create_forward_timeout, - fo_ctxt); - GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fo_ctxt); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** - * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages - * - * @param cls NULL - * @param client identification of the client - * @param message the actual message - */ -static void -handle_peer_destroy (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct GNUNET_TESTBED_PeerDestroyMessage *msg; - struct ForwardedOperationContext *fopc; - struct Peer *peer; - uint32_t peer_id; - - msg = (const struct GNUNET_TESTBED_PeerDestroyMessage *) message; - peer_id = ntohl (msg->peer_id); - LOG_DEBUG ("Received peer destory on peer: %u and operation id: %ul\n", - peer_id, GNUNET_ntohll (msg->operation_id)); - if ((GST_peer_list_size <= peer_id) || (NULL == GST_peer_list[peer_id])) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Asked to destroy a non existent peer with id: %u\n", peer_id); - GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), - "Peer doesn't exist"); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - peer = GST_peer_list[peer_id]; - if (GNUNET_YES == peer->is_remote) - { - /* Forward the destory message to sub controller */ - fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); - GNUNET_SERVER_client_keep (client); - fopc->client = client; - fopc->cls = peer; - fopc->type = OP_PEER_DESTROY; - fopc->operation_id = GNUNET_ntohll (msg->operation_id); - fopc->opc = - GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. - slave->controller, - fopc->operation_id, &msg->header, - &peer_destroy_success_cb, fopc); - fopc->timeout_task = - GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, - fopc); - GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - peer->destroy_flag = GNUNET_YES; - if (0 == peer->reference_cnt) - GST_destroy_peer (peer); - else - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Delaying peer destroy as peer is currently in use\n"); - send_operation_success_msg (client, GNUNET_ntohll (msg->operation_id)); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** - * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages - * - * @param cls NULL - * @param client identification of the client - * @param message the actual message - */ -static void -handle_peer_start (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct GNUNET_TESTBED_PeerStartMessage *msg; - struct GNUNET_TESTBED_PeerEventMessage *reply; - struct ForwardedOperationContext *fopc; - struct Peer *peer; - uint32_t peer_id; - - msg = (const struct GNUNET_TESTBED_PeerStartMessage *) message; - peer_id = ntohl (msg->peer_id); - if ((peer_id >= GST_peer_list_size) || (NULL == GST_peer_list[peer_id])) - { - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_ERROR, - "Asked to start a non existent peer with id: %u\n", peer_id); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - peer = GST_peer_list[peer_id]; - if (GNUNET_YES == peer->is_remote) - { - fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); - GNUNET_SERVER_client_keep (client); - fopc->client = client; - fopc->operation_id = GNUNET_ntohll (msg->operation_id); - fopc->type = OP_PEER_START; - fopc->opc = - GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. - slave->controller, - fopc->operation_id, &msg->header, - &GST_forwarded_operation_reply_relay, - fopc); - fopc->timeout_task = - GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, - fopc); - GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - if (GNUNET_OK != GNUNET_TESTING_peer_start (peer->details.local.peer)) - { - GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), - "Failed to start"); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - peer->details.local.is_running = GNUNET_YES; - reply = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerEventMessage)); - reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT); - reply->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerEventMessage)); - reply->event_type = htonl (GNUNET_TESTBED_ET_PEER_START); - reply->host_id = htonl (GST_context->host_id); - reply->peer_id = msg->peer_id; - reply->operation_id = msg->operation_id; - GST_queue_message (client, &reply->header); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** - * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages - * - * @param cls NULL - * @param client identification of the client - * @param message the actual message - */ -static void -handle_peer_stop (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct GNUNET_TESTBED_PeerStopMessage *msg; - struct GNUNET_TESTBED_PeerEventMessage *reply; - struct ForwardedOperationContext *fopc; - struct Peer *peer; - uint32_t peer_id; - - msg = (const struct GNUNET_TESTBED_PeerStopMessage *) message; - peer_id = ntohl (msg->peer_id); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PEER_STOP for peer %u\n", peer_id); - if ((peer_id >= GST_peer_list_size) || (NULL == GST_peer_list[peer_id])) - { - GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), - "Peer not found"); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - peer = GST_peer_list[peer_id]; - if (GNUNET_YES == peer->is_remote) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Forwarding PEER_STOP for peer %u\n", - peer_id); - fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); - GNUNET_SERVER_client_keep (client); - fopc->client = client; - fopc->operation_id = GNUNET_ntohll (msg->operation_id); - fopc->type = OP_PEER_STOP; - fopc->opc = - GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. - slave->controller, - fopc->operation_id, &msg->header, - &GST_forwarded_operation_reply_relay, - fopc); - fopc->timeout_task = - GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, - fopc); - GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - if (GNUNET_OK != GNUNET_TESTING_peer_kill (peer->details.local.peer)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Stopping peer %u failed\n", peer_id); - GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), - "Peer not running"); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %u successfully stopped\n", peer_id); - peer->details.local.is_running = GNUNET_NO; - reply = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerEventMessage)); - reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT); - reply->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerEventMessage)); - reply->event_type = htonl (GNUNET_TESTBED_ET_PEER_STOP); - reply->host_id = htonl (GST_context->host_id); - reply->peer_id = msg->peer_id; - reply->operation_id = msg->operation_id; - GST_queue_message (client, &reply->header); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - GNUNET_TESTING_peer_wait (peer->details.local.peer); -} - - -/** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_GETPEERCONFIG messages - * - * @param cls NULL - * @param client identification of the client - * @param message the actual message - */ -static void -handle_peer_get_config (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct GNUNET_TESTBED_PeerGetConfigurationMessage *msg; - struct GNUNET_TESTBED_PeerConfigurationInformationMessage *reply; - struct Peer *peer; - char *config; - char *xconfig; - size_t c_size; - size_t xc_size; - uint32_t peer_id; - uint16_t msize; - - msg = (const struct GNUNET_TESTBED_PeerGetConfigurationMessage *) message; - peer_id = ntohl (msg->peer_id); - if ((peer_id >= GST_peer_list_size) || (NULL == GST_peer_list[peer_id])) - { - GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), - "Peer not found"); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - peer = GST_peer_list[peer_id]; - if (GNUNET_YES == peer->is_remote) - { - struct ForwardedOperationContext *fopc; - - LOG_DEBUG ("Forwarding PEER_GET_CONFIG for peer: %u\n", peer_id); - fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); - GNUNET_SERVER_client_keep (client); - fopc->client = client; - fopc->operation_id = GNUNET_ntohll (msg->operation_id); - fopc->type = OP_PEER_INFO; - fopc->opc = - GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. - slave->controller, - fopc->operation_id, &msg->header, - &GST_forwarded_operation_reply_relay, - fopc); - fopc->timeout_task = - GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, - fopc); - GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - LOG_DEBUG ("Received PEER_GET_CONFIG for peer: %u\n", peer_id); - config = - GNUNET_CONFIGURATION_serialize (GST_peer_list[peer_id]->details.local.cfg, - &c_size); - xc_size = GNUNET_TESTBED_compress_config_ (config, c_size, &xconfig); - GNUNET_free (config); - msize = - xc_size + - sizeof (struct GNUNET_TESTBED_PeerConfigurationInformationMessage); - reply = GNUNET_realloc (xconfig, msize); - (void) memmove (&reply[1], reply, xc_size); - reply->header.size = htons (msize); - reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION); - reply->peer_id = msg->peer_id; - reply->operation_id = msg->operation_id; - GNUNET_TESTING_peer_get_identity (GST_peer_list[peer_id]->details.local.peer, - &reply->peer_identity); - reply->config_size = htons ((uint16_t) c_size); - GST_queue_message (client, &reply->header); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - /** * Handler for GNUNET_MESSAGE_TYPE_TESTBED_GETSLAVECONFIG messages * @@ -1991,286 +1369,12 @@ handle_slave_get_config (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_OK); } -struct ManageServiceContext -{ - - struct ManageServiceContext *next; - - struct ManageServiceContext *prev; - - struct GNUNET_ARM_Handle *ah; - - struct Peer *peer; - - struct GNUNET_SERVER_Client *client; - - uint64_t op_id; - - uint8_t start; - - uint8_t expired; - -}; - -static struct ManageServiceContext *mctx_head; - -static struct ManageServiceContext *mctx_tail; - -static void -cleanup_mctx (struct ManageServiceContext *mctx) -{ - mctx->expired = GNUNET_YES; - GNUNET_CONTAINER_DLL_remove (mctx_head, mctx_tail, mctx); - GNUNET_SERVER_client_drop (mctx->client); - GNUNET_ARM_disconnect_and_free (mctx->ah); - GNUNET_assert (0 < mctx->peer->reference_cnt); - mctx->peer->reference_cnt--; - if ( (GNUNET_YES == mctx->peer->destroy_flag) - && (0 == mctx->peer->reference_cnt) ) - GST_destroy_peer (mctx->peer); - GNUNET_free (mctx); -} - -static void -free_mctxq () -{ - while (NULL != mctx_head) - cleanup_mctx (mctx_head); -} - -static const char * -arm_req_string (enum GNUNET_ARM_RequestStatus rs) -{ - switch (rs) - { - case GNUNET_ARM_REQUEST_SENT_OK: - return _("Message was sent successfully"); - case GNUNET_ARM_REQUEST_CONFIGURATION_ERROR: - return _("Misconfiguration (can't connect to the ARM service)"); - case GNUNET_ARM_REQUEST_DISCONNECTED: - return _("We disconnected from ARM before we could send a request"); - case GNUNET_ARM_REQUEST_BUSY: - return _("ARM API is busy"); - case GNUNET_ARM_REQUEST_TOO_LONG: - return _("Request doesn't fit into a message"); - case GNUNET_ARM_REQUEST_TIMEOUT: - return _("Request timed out"); - } - return _("Unknown request status"); -} - -static const char * -arm_ret_string (enum GNUNET_ARM_Result result) -{ - switch (result) - { - case GNUNET_ARM_RESULT_STOPPED: - return _("%s is stopped"); - case GNUNET_ARM_RESULT_STARTING: - return _("%s is starting"); - case GNUNET_ARM_RESULT_STOPPING: - return _("%s is stopping"); - case GNUNET_ARM_RESULT_IS_STARTING_ALREADY: - return _("%s is starting already"); - case GNUNET_ARM_RESULT_IS_STOPPING_ALREADY: - return _("%s is stopping already"); - case GNUNET_ARM_RESULT_IS_STARTED_ALREADY: - return _("%s is started already"); - case GNUNET_ARM_RESULT_IS_STOPPED_ALREADY: - return _("%s is stopped already"); - case GNUNET_ARM_RESULT_IS_NOT_KNOWN: - return _("%s service is not known to ARM"); - case GNUNET_ARM_RESULT_START_FAILED: - return _("%s service failed to start"); - case GNUNET_ARM_RESULT_IN_SHUTDOWN: - return _("%s service can't be started because ARM is shutting down"); - } - return _("%.s Unknown result code."); -} - -static void -service_manage_result_cb (void *cls, struct GNUNET_ARM_Handle *arm, - enum GNUNET_ARM_RequestStatus rs, - const char *service, enum GNUNET_ARM_Result result) -{ - struct ManageServiceContext *mctx = cls; - char *emsg; - - emsg = NULL; - if (GNUNET_YES == mctx->expired) - return; - if (GNUNET_ARM_REQUEST_SENT_OK != rs) - { - GNUNET_asprintf (&emsg, "Error communicating with Peer %u's ARM: %s", - mctx->peer->id, arm_req_string (rs)); - goto ret; - } - if (1 == mctx->start) - goto service_start_check; - if (! ((GNUNET_ARM_RESULT_STOPPED == result) - || (GNUNET_ARM_RESULT_STOPPING == result) - || (GNUNET_ARM_RESULT_IS_STOPPING_ALREADY == result) - || (GNUNET_ARM_RESULT_IS_STOPPED_ALREADY == result)) ) - { - /* stopping a service failed */ - GNUNET_asprintf (&emsg, arm_ret_string (result), service); - goto ret; - } - /* service stopped successfully */ - goto ret; - - service_start_check: - if (! ((GNUNET_ARM_RESULT_STARTING == result) - || (GNUNET_ARM_RESULT_IS_STARTING_ALREADY == result) - || (GNUNET_ARM_RESULT_IS_STARTED_ALREADY == result)) ) - { - /* starting a service failed */ - GNUNET_asprintf (&emsg, arm_ret_string (result), service); - goto ret; - } - /* service started successfully */ - - ret: - if (NULL != emsg) - { - LOG_DEBUG ("%s\n", emsg); - GST_send_operation_fail_msg (mctx->client, mctx->op_id, emsg); - } - else - send_operation_success_msg (mctx->client, mctx->op_id); - GNUNET_free_non_null (emsg); - cleanup_mctx (mctx); -} - -static void -handle_manage_peer_service (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct GNUNET_TESTBED_ManagePeerServiceMessage *msg; - const char* service; - struct Peer *peer; - char *emsg; - struct GNUNET_ARM_Handle *ah; - struct ManageServiceContext *mctx; - struct ForwardedOperationContext *fopc; - uint64_t op_id; - uint32_t peer_id; - uint16_t msize; - - - msize = ntohs (message->size); - if (msize <= sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage)) - { - GNUNET_break_op (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - msg = (const struct GNUNET_TESTBED_ManagePeerServiceMessage *) message; - service = (const char *) &msg[1]; - if ('\0' != service[msize - sizeof - (struct GNUNET_TESTBED_ManagePeerServiceMessage) - 1]) - { - GNUNET_break_op (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - if (1 < msg->start) - { - GNUNET_break_op (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - peer_id = ntohl (msg->peer_id); - op_id = GNUNET_ntohll (msg->operation_id); - LOG_DEBUG ("Received request to manage service %s on peer %u\n", - service, (unsigned int) peer_id); - if ((GST_peer_list_size <= peer_id) - || (NULL == (peer = GST_peer_list[peer_id]))) - { - GNUNET_asprintf (&emsg, "Asked to manage service of a non existent peer " - "with id: %u", peer_id); - goto err_ret; - } - if (0 == strcasecmp ("arm", service)) - { - emsg = GNUNET_strdup ("Cannot start/stop peer's ARM service. " - "Use peer start/stop for that"); - goto err_ret; - } - if (GNUNET_YES == peer->is_remote) - { - /* Forward the destory message to sub controller */ - fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); - GNUNET_SERVER_client_keep (client); - fopc->client = client; - fopc->cls = peer; - fopc->type = OP_MANAGE_SERVICE; - fopc->operation_id = op_id; - fopc->opc = - GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. - slave->controller, - fopc->operation_id, &msg->header, - &GST_forwarded_operation_reply_relay, - fopc); - fopc->timeout_task = - GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, - fopc); - GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - if ((0 != peer->reference_cnt) - && ( (0 == strcasecmp ("core", service)) - || (0 == strcasecmp ("transport", service)) ) ) - { - GNUNET_asprintf (&emsg, "Cannot stop %s service of peer with id: %u " - "since it is required by existing operations", - service, peer_id); - goto err_ret; - } - ah = GNUNET_ARM_connect (peer->details.local.cfg, NULL, NULL); - if (NULL == ah) - { - GNUNET_asprintf (&emsg, - "Cannot connect to ARM service of peer with id: %u", - peer_id); - goto err_ret; - } - mctx = GNUNET_malloc (sizeof (struct ManageServiceContext)); - mctx->peer = peer; - peer->reference_cnt++; - mctx->op_id = op_id; - mctx->ah = ah; - GNUNET_SERVER_client_keep (client); - mctx->client = client; - mctx->start = msg->start; - GNUNET_CONTAINER_DLL_insert_tail (mctx_head, mctx_tail, mctx); - if (1 == mctx->start) - GNUNET_ARM_request_service_start (mctx->ah, service, - GNUNET_OS_INHERIT_STD_ERR, - GST_timeout, - service_manage_result_cb, - mctx); - else - GNUNET_ARM_request_service_stop (mctx->ah, service, - GST_timeout, - service_manage_result_cb, - mctx); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - - err_ret: - LOG (GNUNET_ERROR_TYPE_ERROR, "%s\n", emsg); - GST_send_operation_fail_msg (client, op_id, emsg); - GNUNET_free (emsg); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} /** * Clears the forwarded operations queue */ -static void -clear_fopcq () +void +GST_clear_fopcq () { struct ForwardedOperationContext *fopc; @@ -2313,184 +1417,6 @@ clear_fopcq () } -/** - * Task run upon timeout of forwarded SHUTDOWN_PEERS operation - * - * @param cls the ForwardedOperationContext - * @param tc the scheduler task context - */ -static void -shutdown_peers_timeout_cb (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct ForwardedOperationContext *fo_ctxt = cls; - struct HandlerContext_ShutdownPeers *hc; - - fo_ctxt->timeout_task = GNUNET_SCHEDULER_NO_TASK; - hc = fo_ctxt->cls; - hc->timeout = GNUNET_YES; - GNUNET_assert (0 < hc->nslaves); - hc->nslaves--; - if (0 == hc->nslaves) - GST_send_operation_fail_msg (fo_ctxt->client, fo_ctxt->operation_id, - "Timeout at a slave controller"); - GNUNET_TESTBED_forward_operation_msg_cancel_ (fo_ctxt->opc); - GNUNET_SERVER_client_drop (fo_ctxt->client); - GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fo_ctxt); - GNUNET_free (fo_ctxt); -} - - -/** - * The reply msg handler forwarded SHUTDOWN_PEERS operation. Checks if a - * success reply is received from all clients and then sends the success message - * to the client - * - * @param cls ForwardedOperationContext - * @param msg the message to relay - */ -static void -shutdown_peers_reply_cb (void *cls, - const struct GNUNET_MessageHeader *msg) -{ - struct ForwardedOperationContext *fo_ctxt = cls; - struct HandlerContext_ShutdownPeers *hc; - - hc = fo_ctxt->cls; - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != fo_ctxt->timeout_task); - GNUNET_SCHEDULER_cancel (fo_ctxt->timeout_task); - fo_ctxt->timeout_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (0 < hc->nslaves); - hc->nslaves--; - if (GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS != - ntohs (msg->type)) - hc->timeout = GNUNET_YES; - if (0 == hc->nslaves) - { - if (GNUNET_YES == hc->timeout) - GST_send_operation_fail_msg (fo_ctxt->client, fo_ctxt->operation_id, - "Timeout at a slave controller"); - else - send_operation_success_msg (fo_ctxt->client, fo_ctxt->operation_id); - } - GNUNET_SERVER_client_drop (fo_ctxt->client); - GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fo_ctxt); - GNUNET_free (fo_ctxt); -} - - -/** - * Stops and destroys all peers - */ -static void -destroy_peers () -{ - struct Peer *peer; - unsigned int id; - - if (NULL == GST_peer_list) - return; - for (id = 0; id < GST_peer_list_size; id++) - { - peer = GST_peer_list[id]; - if (NULL == peer) - continue; - /* If destroy flag is set it means that this peer should have been - * destroyed by a context which we destroy before */ - GNUNET_break (GNUNET_NO == peer->destroy_flag); - /* counter should be zero as we free all contexts before */ - GNUNET_break (0 == peer->reference_cnt); - if ((GNUNET_NO == peer->is_remote) && - (GNUNET_YES == peer->details.local.is_running)) - GNUNET_TESTING_peer_kill (peer->details.local.peer); - } - for (id = 0; id < GST_peer_list_size; id++) - { - peer = GST_peer_list[id]; - if (NULL == peer) - continue; - if (GNUNET_NO == peer->is_remote) - { - if (GNUNET_YES == peer->details.local.is_running) - GNUNET_TESTING_peer_wait (peer->details.local.peer); - GNUNET_TESTING_peer_destroy (peer->details.local.peer); - GNUNET_CONFIGURATION_destroy (peer->details.local.cfg); - } - GNUNET_free (peer); - } - GNUNET_free_non_null (GST_peer_list); - GST_peer_list = NULL; - GST_peer_list_size = 0; -} - - -/** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS messages - * - * @param cls NULL - * @param client identification of the client - * @param message the actual message - */ -static void -handle_shutdown_peers (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct GNUNET_TESTBED_ShutdownPeersMessage *msg; - struct HandlerContext_ShutdownPeers *hc; - struct Slave *slave; - struct ForwardedOperationContext *fo_ctxt; - uint64_t op_id; - unsigned int cnt; - - msg = (const struct GNUNET_TESTBED_ShutdownPeersMessage *) message; - LOG_DEBUG ("Received SHUTDOWN_PEERS\n"); - /* Stop and destroy all peers */ - free_mctxq (); - GST_free_occq (); - GST_free_roccq (); - clear_fopcq (); - /* Forward to all slaves which we have started */ - op_id = GNUNET_ntohll (msg->operation_id); - hc = GNUNET_malloc (sizeof (struct HandlerContext_ShutdownPeers)); - /* FIXME: have a better implementation where we track which slaves are - started by this controller */ - for (cnt = 0; cnt < GST_slave_list_size; cnt++) - { - slave = GST_slave_list[cnt]; - if (NULL == slave) - continue; - if (NULL == slave->controller_proc) /* We didn't start the slave */ - continue; - LOG_DEBUG ("Forwarding SHUTDOWN_PEERS\n"); - hc->nslaves++; - fo_ctxt = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); - GNUNET_SERVER_client_keep (client); - fo_ctxt->client = client; - fo_ctxt->operation_id = op_id; - fo_ctxt->cls = hc; - fo_ctxt->type = OP_SHUTDOWN_PEERS; - fo_ctxt->opc = - GNUNET_TESTBED_forward_operation_msg_ (slave->controller, - fo_ctxt->operation_id, - &msg->header, - shutdown_peers_reply_cb, - fo_ctxt); - fo_ctxt->timeout_task = - GNUNET_SCHEDULER_add_delayed (GST_timeout, &shutdown_peers_timeout_cb, - fo_ctxt); - GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fo_ctxt); - } - LOG_DEBUG ("Shutting down peers\n"); - destroy_peers (); - if (0 == hc->nslaves) - { - send_operation_success_msg (client, op_id); - GNUNET_free (hc); - } - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - /** * Iterator over hash map entries. * @@ -2568,7 +1494,7 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) NULL); GNUNET_CONTAINER_multihashmap_destroy (ss_map); /* cleanup any remaining forwarded operations */ - clear_fopcq (); + GST_clear_fopcq (); if (NULL != lcfq_head) { if (GNUNET_SCHEDULER_NO_TASK != lcf_proc_task_id) @@ -2587,11 +1513,11 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_CONTAINER_DLL_remove (lcfq_head, lcfq_tail, lcfq); GNUNET_free (lcfq); } - free_mctxq (); + GST_free_mctxq (); GST_free_occq (); GST_free_roccq (); /* Clear peer list */ - destroy_peers (); + GST_destroy_peers (); /* Clear host list */ for (id = 0; id < GST_host_list_size; id++) if (NULL != GST_host_list[id]) @@ -2699,14 +1625,14 @@ testbed_run (void *cls, struct GNUNET_SERVER_Handle *server, GNUNET_MESSAGE_TYPE_TESTBED_SHARE_SERVICE, 0}, {&handle_link_controllers, NULL, GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS, 0}, - {&handle_peer_create, NULL, GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER, 0}, - {&handle_peer_destroy, NULL, GNUNET_MESSAGE_TYPE_TESTBED_DESTROY_PEER, + {&GST_handle_peer_create, NULL, GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER, 0}, + {&GST_handle_peer_destroy, NULL, GNUNET_MESSAGE_TYPE_TESTBED_DESTROY_PEER, sizeof (struct GNUNET_TESTBED_PeerDestroyMessage)}, - {&handle_peer_start, NULL, GNUNET_MESSAGE_TYPE_TESTBED_START_PEER, + {&GST_handle_peer_start, NULL, GNUNET_MESSAGE_TYPE_TESTBED_START_PEER, sizeof (struct GNUNET_TESTBED_PeerStartMessage)}, - {&handle_peer_stop, NULL, GNUNET_MESSAGE_TYPE_TESTBED_STOP_PEER, + {&GST_handle_peer_stop, NULL, GNUNET_MESSAGE_TYPE_TESTBED_STOP_PEER, sizeof (struct GNUNET_TESTBED_PeerStopMessage)}, - {&handle_peer_get_config, NULL, + {&GST_handle_peer_get_config, NULL, GNUNET_MESSAGE_TYPE_TESTBED_GET_PEER_CONFIGURATION, sizeof (struct GNUNET_TESTBED_PeerGetConfigurationMessage)}, {&GST_handle_overlay_connect, NULL, @@ -2714,12 +1640,12 @@ testbed_run (void *cls, struct GNUNET_SERVER_Handle *server, sizeof (struct GNUNET_TESTBED_OverlayConnectMessage)}, {&GST_handle_remote_overlay_connect, NULL, GNUNET_MESSAGE_TYPE_TESTBED_REMOTE_OVERLAY_CONNECT, 0}, - {&handle_manage_peer_service, NULL, + {&GST_handle_manage_peer_service, NULL, GNUNET_MESSAGE_TYPE_TESTBED_MANAGE_PEER_SERVICE, 0}, {&handle_slave_get_config, NULL, GNUNET_MESSAGE_TYPE_TESTBED_GET_SLAVE_CONFIGURATION, sizeof (struct GNUNET_TESTBED_SlaveGetConfigurationMessage)}, - {&handle_shutdown_peers, NULL, GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS, + {&GST_handle_shutdown_peers, NULL, GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS, sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage)}, {NULL} }; diff --git a/src/testbed/gnunet-service-testbed.h b/src/testbed/gnunet-service-testbed.h index 700bbfcd8..aa3ab0b83 100644 --- a/src/testbed/gnunet-service-testbed.h +++ b/src/testbed/gnunet-service-testbed.h @@ -599,6 +599,25 @@ struct LCFContextQueue }; +/** + * Context data for GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS handler + */ +struct HandlerContext_ShutdownPeers +{ + /** + * The number of slave we expect to hear from since we forwarded the + * GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS message to them + */ + unsigned int nslaves; + + /** + * Did we observe a timeout with respect to this operation at any of the + * slaves + */ + int timeout; +}; + + /** * Our configuration */ @@ -665,6 +684,30 @@ extern unsigned int GST_slave_list_size; extern char *GST_stats_dir; +/** + * Similar to GNUNET_array_grow(); however instead of calling GNUNET_array_grow() + * several times we call it only once. The array is also made to grow in steps + * of LIST_GROW_STEP. + * + * @param ptr the array pointer to grow + * @param size the size of array + * @param accommodate_size the size which the array has to accommdate; after + * this call the array will be big enough to accommdate sizes upto + * accommodate_size + */ +#define GST_array_grow_large_enough(ptr, size, accommodate_size) \ + do \ + { \ + unsigned int growth_size; \ + GNUNET_assert (size <= accommodate_size); \ + growth_size = size; \ + while (growth_size <= accommodate_size) \ + growth_size += LIST_GROW_STEP; \ + GNUNET_array_grow (ptr, size, growth_size); \ + GNUNET_assert (size > accommodate_size); \ + } while (0) + + /** * Queues a message in send queue for sending to the service * @@ -685,6 +728,13 @@ void GST_destroy_peer (struct Peer *peer); +/** + * Stops and destroys all peers + */ +void +GST_destroy_peers (); + + /** * Finds the route with directly connected host as destination through which * the destination host can be reached @@ -746,6 +796,13 @@ GST_forwarded_operation_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +/** + * Clears the forwarded operations queue + */ +void +GST_clear_fopcq (); + + /** * Send operation failure message to client * @@ -758,6 +815,17 @@ GST_send_operation_fail_msg (struct GNUNET_SERVER_Client *client, uint64_t operation_id, const char *emsg); +/** + * Function to send generic operation success message to given client + * + * @param client the client to send the message to + * @param operation_id the id of the operation which was successful + */ +void +GST_send_operation_success_msg (struct GNUNET_SERVER_Client *client, + uint64_t operation_id); + + /** * Handler for GNUNET_MESSAGE_TYPE_TESTBED_REQUESTCONNECT messages * @@ -771,6 +839,97 @@ GST_handle_remote_overlay_connect (void *cls, const struct GNUNET_MessageHeader *message); +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_CREATEPEER messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_create (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message); + + +/** + * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_destroy (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message); + + +/** + * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_start (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message); + + +/** + * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_stop (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message); + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_GETPEERCONFIG messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_get_config (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message); + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_shutdown_peers (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message); + + +/** + * Handler for GNUNET_TESTBED_ManagePeerServiceMessage message + * + * @param cls NULL + * @param client identification of client + * @param message the actual message + */ +void +GST_handle_manage_peer_service (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message); + + +/** + * Frees the ManageServiceContext queue + */ +void +GST_free_mctxq (); + + /** * Processes a forwarded overlay connect context in the queue of the given RegisteredHostContext * diff --git a/src/testbed/gnunet-service-testbed_oc.c b/src/testbed/gnunet-service-testbed_oc.c index 7fdf14586..3199d71af 100644 --- a/src/testbed/gnunet-service-testbed_oc.c +++ b/src/testbed/gnunet-service-testbed_oc.c @@ -942,7 +942,6 @@ occ_cache_get_handle_core_cb (void *cls, struct GNUNET_CORE_Handle *ch, occ->peer->details.local.cfg, p1_transport_connect_cache_callback, occ, NULL, NULL, NULL); - return; } @@ -998,13 +997,12 @@ registeredhost_registration_completion (void *cls, const char *emsg) const struct GNUNET_CONFIGURATION_Handle *cfg; uint32_t peer2_host_id; - /* if (NULL != rhc->focc_dll_head) */ - /* TESTBED_process_next_focc (rhc); */ peer2_host_id = GNUNET_TESTBED_host_get_id_ (rhc->reg_host); GNUNET_assert (RHC_INIT == rhc->state); GNUNET_assert (NULL == rhc->sub_op); - if ((NULL == rhc->gateway2) || ((peer2_host_id < GST_host_list_size) /* Check if we have the needed config */ - && (NULL != GST_host_list[peer2_host_id]))) + if ((NULL == rhc->gateway2) || + ( (peer2_host_id < GST_host_list_size) /* Check if we have the needed config */ + && (NULL != GST_host_list[peer2_host_id]) ) ) { rhc->state = RHC_LINK; cfg = @@ -1233,6 +1231,7 @@ GST_handle_overlay_connect (void *cls, struct GNUNET_SERVER_Client *client, if ((peer2_host_id >= GST_slave_list_size) || (NULL == GST_slave_list[peer2_host_id])) { + GNUNET_break (0); LOG (GNUNET_ERROR_TYPE_WARNING, "0x%llx: Configuration of peer2's controller missing for connecting peers" "%u and %u\n", operation_id, p1, p2); diff --git a/src/testbed/gnunet-service-testbed_peers.c b/src/testbed/gnunet-service-testbed_peers.c new file mode 100644 index 000000000..65cfe342c --- /dev/null +++ b/src/testbed/gnunet-service-testbed_peers.c @@ -0,0 +1,1138 @@ +/* + This file is part of GNUnet. + (C) 2008--2013 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + + +/** + * @file testbed/gnunet-service-testbed_peers.c + * @brief implementation of TESTBED service that deals with peer management + * @author Sree Harsha Totakura + */ + +#include "gnunet-service-testbed.h" +#include "gnunet_arm_service.h" +#include + + +/** + * A list of peers we know about + */ +struct Peer **GST_peer_list; + + +/** + * Context information to manage peers' services + */ +struct ManageServiceContext +{ + /** + * DLL next ptr + */ + struct ManageServiceContext *next; + + /** + * DLL prev ptr + */ + struct ManageServiceContext *prev; + + /** + * The ARM handle of the peer + */ + struct GNUNET_ARM_Handle *ah; + + /** + * peer whose service has to be managed + */ + struct Peer *peer; + + /** + * The client which requested to manage the peer's service + */ + struct GNUNET_SERVER_Client *client; + + /** + * The operation id of the associated request + */ + uint64_t op_id; + + /** + * 1 if the service at the peer has to be started; 0 if it has to be stopped + */ + uint8_t start; + + /** + * Is this context expired? Do not work on this context if it is set to + * GNUNET_YES + */ + uint8_t expired; +}; + + +/** + * DLL head for queue of manage service requests + */ +static struct ManageServiceContext *mctx_head; + +/** + * DLL tail for queue of manage service requests + */ +static struct ManageServiceContext *mctx_tail; + + +/** + * Adds a peer to the peer array + * + * @param peer the peer to add + */ +static void +peer_list_add (struct Peer *peer) +{ + if (peer->id >= GST_peer_list_size) + GST_array_grow_large_enough (GST_peer_list, GST_peer_list_size, peer->id); + GNUNET_assert (NULL == GST_peer_list[peer->id]); + GST_peer_list[peer->id] = peer; +} + + +/** + * Removes a the give peer from the peer array + * + * @param peer the peer to be removed + */ +static void +peer_list_remove (struct Peer *peer) +{ + unsigned int orig_size; + uint32_t id; + + GST_peer_list[peer->id] = NULL; + orig_size = GST_peer_list_size; + while (GST_peer_list_size >= LIST_GROW_STEP) + { + for (id = GST_peer_list_size - 1; + (id >= GST_peer_list_size - LIST_GROW_STEP) && (id != UINT32_MAX); + id--) + if (NULL != GST_peer_list[id]) + break; + if (id != ((GST_peer_list_size - LIST_GROW_STEP) - 1)) + break; + GST_peer_list_size -= LIST_GROW_STEP; + } + if (orig_size == GST_peer_list_size) + return; + GST_peer_list = + GNUNET_realloc (GST_peer_list, + sizeof (struct Peer *) * GST_peer_list_size); +} + + +/** + * The task to be executed if the forwarded peer create operation has been + * timed out + * + * @param cls the FowardedOperationContext + * @param tc the TaskContext from the scheduler + */ +static void +peer_create_forward_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct ForwardedOperationContext *fopc = cls; + + GNUNET_free (fopc->cls); + GST_forwarded_operation_timeout (fopc, tc); +} + + +/** + * Callback to be called when forwarded peer create operation is successfull. We + * have to relay the reply msg back to the client + * + * @param cls ForwardedOperationContext + * @param msg the peer create success message + */ +static void +peer_create_success_cb (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct ForwardedOperationContext *fopc = cls; + struct Peer *remote_peer; + + if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS) + { + GNUNET_assert (NULL != fopc->cls); + remote_peer = fopc->cls; + peer_list_add (remote_peer); + } + GST_forwarded_operation_reply_relay (fopc, msg); +} + + +/** + * Function to destroy a peer + * + * @param peer the peer structure to destroy + */ +void +GST_destroy_peer (struct Peer *peer) +{ + GNUNET_break (0 == peer->reference_cnt); + if (GNUNET_YES == peer->is_remote) + { + peer_list_remove (peer); + GNUNET_free (peer); + return; + } + if (GNUNET_YES == peer->details.local.is_running) + { + GNUNET_TESTING_peer_stop (peer->details.local.peer); + peer->details.local.is_running = GNUNET_NO; + } + GNUNET_TESTING_peer_destroy (peer->details.local.peer); + GNUNET_CONFIGURATION_destroy (peer->details.local.cfg); + peer_list_remove (peer); + GNUNET_free (peer); +} + + +/** + * Callback to be called when forwarded peer destroy operation is successfull. We + * have to relay the reply msg back to the client + * + * @param cls ForwardedOperationContext + * @param msg the peer create success message + */ +static void +peer_destroy_success_cb (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct ForwardedOperationContext *fopc = cls; + struct Peer *remote_peer; + + if (GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS == + ntohs (msg->type)) + { + remote_peer = fopc->cls; + GNUNET_assert (NULL != remote_peer); + remote_peer->destroy_flag = GNUNET_YES; + if (0 == remote_peer->reference_cnt) + GST_destroy_peer (remote_peer); + } + GST_forwarded_operation_reply_relay (fopc, msg); +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_CREATEPEER messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_create (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_PeerCreateMessage *msg; + struct GNUNET_TESTBED_PeerCreateSuccessEventMessage *reply; + struct GNUNET_CONFIGURATION_Handle *cfg; + struct ForwardedOperationContext *fo_ctxt; + struct Route *route; + struct Peer *peer; + char *config; + size_t dest_size; + int ret; + uint32_t config_size; + uint32_t host_id; + uint32_t peer_id; + uint16_t msize; + + + msize = ntohs (message->size); + if (msize <= sizeof (struct GNUNET_TESTBED_PeerCreateMessage)) + { + GNUNET_break (0); /* We need configuration */ + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + msg = (const struct GNUNET_TESTBED_PeerCreateMessage *) message; + host_id = ntohl (msg->host_id); + peer_id = ntohl (msg->peer_id); + if (UINT32_MAX == peer_id) + { + GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), + "Cannot create peer with given ID"); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + if (host_id == GST_context->host_id) + { + char *emsg; + + /* We are responsible for this peer */ + msize -= sizeof (struct GNUNET_TESTBED_PeerCreateMessage); + config_size = ntohl (msg->config_size); + config = GNUNET_malloc (config_size); + dest_size = config_size; + if (Z_OK != + (ret = + uncompress ((Bytef *) config, (uLongf *) & dest_size, + (const Bytef *) &msg[1], (uLong) msize))) + { + GNUNET_break (0); /* uncompression error */ + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + if (config_size != dest_size) + { + GNUNET_break (0); /* Uncompressed config size mismatch */ + GNUNET_free (config); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + cfg = GNUNET_CONFIGURATION_create (); + if (GNUNET_OK != + GNUNET_CONFIGURATION_deserialize (cfg, config, config_size, GNUNET_NO)) + { + GNUNET_break (0); /* Configuration parsing error */ + GNUNET_free (config); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + GNUNET_free (config); + GNUNET_CONFIGURATION_set_value_number (cfg, "TESTBED", "PEERID", + (unsigned long long) peer_id); + peer = GNUNET_malloc (sizeof (struct Peer)); + peer->is_remote = GNUNET_NO; + peer->details.local.cfg = cfg; + peer->id = peer_id; + LOG_DEBUG ("Creating peer with id: %u\n", (unsigned int) peer->id); + peer->details.local.peer = + GNUNET_TESTING_peer_configure (GST_context->system, + peer->details.local.cfg, peer->id, + NULL /* Peer id */ , + &emsg); + if (NULL == peer->details.local.peer) + { + LOG (GNUNET_ERROR_TYPE_WARNING, "Configuring peer failed: %s\n", emsg); + GNUNET_free (emsg); + GNUNET_free (peer); + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + peer->details.local.is_running = GNUNET_NO; + peer_list_add (peer); + reply = + GNUNET_malloc (sizeof + (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage)); + reply->header.size = + htons (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage)); + reply->header.type = + htons (GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS); + reply->peer_id = msg->peer_id; + reply->operation_id = msg->operation_id; + GST_queue_message (client, &reply->header); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + + /* Forward peer create request */ + route = GST_find_dest_route (host_id); + if (NULL == route) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + + peer = GNUNET_malloc (sizeof (struct Peer)); + peer->is_remote = GNUNET_YES; + peer->id = peer_id; + peer->details.remote.slave = GST_slave_list[route->dest]; + peer->details.remote.remote_host_id = host_id; + fo_ctxt = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); + GNUNET_SERVER_client_keep (client); + fo_ctxt->client = client; + fo_ctxt->operation_id = GNUNET_ntohll (msg->operation_id); + fo_ctxt->cls = peer; //GST_slave_list[route->dest]->controller; + fo_ctxt->type = OP_PEER_CREATE; + fo_ctxt->opc = + GNUNET_TESTBED_forward_operation_msg_ (GST_slave_list + [route->dest]->controller, + fo_ctxt->operation_id, + &msg->header, + peer_create_success_cb, fo_ctxt); + fo_ctxt->timeout_task = + GNUNET_SCHEDULER_add_delayed (GST_timeout, &peer_create_forward_timeout, + fo_ctxt); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fo_ctxt); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_destroy (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_PeerDestroyMessage *msg; + struct ForwardedOperationContext *fopc; + struct Peer *peer; + uint32_t peer_id; + + msg = (const struct GNUNET_TESTBED_PeerDestroyMessage *) message; + peer_id = ntohl (msg->peer_id); + LOG_DEBUG ("Received peer destory on peer: %u and operation id: %ul\n", + peer_id, GNUNET_ntohll (msg->operation_id)); + if ((GST_peer_list_size <= peer_id) || (NULL == GST_peer_list[peer_id])) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Asked to destroy a non existent peer with id: %u\n", peer_id); + GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), + "Peer doesn't exist"); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + peer = GST_peer_list[peer_id]; + if (GNUNET_YES == peer->is_remote) + { + /* Forward the destory message to sub controller */ + fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); + GNUNET_SERVER_client_keep (client); + fopc->client = client; + fopc->cls = peer; + fopc->type = OP_PEER_DESTROY; + fopc->operation_id = GNUNET_ntohll (msg->operation_id); + fopc->opc = + GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. + slave->controller, + fopc->operation_id, &msg->header, + &peer_destroy_success_cb, fopc); + fopc->timeout_task = + GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, + fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + peer->destroy_flag = GNUNET_YES; + if (0 == peer->reference_cnt) + GST_destroy_peer (peer); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Delaying peer destroy as peer is currently in use\n"); + GST_send_operation_success_msg (client, GNUNET_ntohll (msg->operation_id)); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_start (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_PeerStartMessage *msg; + struct GNUNET_TESTBED_PeerEventMessage *reply; + struct ForwardedOperationContext *fopc; + struct Peer *peer; + uint32_t peer_id; + + msg = (const struct GNUNET_TESTBED_PeerStartMessage *) message; + peer_id = ntohl (msg->peer_id); + if ((peer_id >= GST_peer_list_size) || (NULL == GST_peer_list[peer_id])) + { + GNUNET_break (0); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Asked to start a non existent peer with id: %u\n", peer_id); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + peer = GST_peer_list[peer_id]; + if (GNUNET_YES == peer->is_remote) + { + fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); + GNUNET_SERVER_client_keep (client); + fopc->client = client; + fopc->operation_id = GNUNET_ntohll (msg->operation_id); + fopc->type = OP_PEER_START; + fopc->opc = + GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. + slave->controller, + fopc->operation_id, &msg->header, + &GST_forwarded_operation_reply_relay, + fopc); + fopc->timeout_task = + GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, + fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + if (GNUNET_OK != GNUNET_TESTING_peer_start (peer->details.local.peer)) + { + GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), + "Failed to start"); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + peer->details.local.is_running = GNUNET_YES; + reply = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerEventMessage)); + reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT); + reply->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerEventMessage)); + reply->event_type = htonl (GNUNET_TESTBED_ET_PEER_START); + reply->host_id = htonl (GST_context->host_id); + reply->peer_id = msg->peer_id; + reply->operation_id = msg->operation_id; + GST_queue_message (client, &reply->header); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_stop (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_PeerStopMessage *msg; + struct GNUNET_TESTBED_PeerEventMessage *reply; + struct ForwardedOperationContext *fopc; + struct Peer *peer; + uint32_t peer_id; + + msg = (const struct GNUNET_TESTBED_PeerStopMessage *) message; + peer_id = ntohl (msg->peer_id); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PEER_STOP for peer %u\n", peer_id); + if ((peer_id >= GST_peer_list_size) || (NULL == GST_peer_list[peer_id])) + { + GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), + "Peer not found"); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + peer = GST_peer_list[peer_id]; + if (GNUNET_YES == peer->is_remote) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Forwarding PEER_STOP for peer %u\n", + peer_id); + fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); + GNUNET_SERVER_client_keep (client); + fopc->client = client; + fopc->operation_id = GNUNET_ntohll (msg->operation_id); + fopc->type = OP_PEER_STOP; + fopc->opc = + GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. + slave->controller, + fopc->operation_id, &msg->header, + &GST_forwarded_operation_reply_relay, + fopc); + fopc->timeout_task = + GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, + fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + if (GNUNET_OK != GNUNET_TESTING_peer_kill (peer->details.local.peer)) + { + LOG (GNUNET_ERROR_TYPE_WARNING, "Stopping peer %u failed\n", peer_id); + GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), + "Peer not running"); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %u successfully stopped\n", peer_id); + peer->details.local.is_running = GNUNET_NO; + reply = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerEventMessage)); + reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT); + reply->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerEventMessage)); + reply->event_type = htonl (GNUNET_TESTBED_ET_PEER_STOP); + reply->host_id = htonl (GST_context->host_id); + reply->peer_id = msg->peer_id; + reply->operation_id = msg->operation_id; + GST_queue_message (client, &reply->header); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_TESTING_peer_wait (peer->details.local.peer); +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_GETPEERCONFIG messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_peer_get_config (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_PeerGetConfigurationMessage *msg; + struct GNUNET_TESTBED_PeerConfigurationInformationMessage *reply; + struct Peer *peer; + char *config; + char *xconfig; + size_t c_size; + size_t xc_size; + uint32_t peer_id; + uint16_t msize; + + msg = (const struct GNUNET_TESTBED_PeerGetConfigurationMessage *) message; + peer_id = ntohl (msg->peer_id); + if ((peer_id >= GST_peer_list_size) || (NULL == GST_peer_list[peer_id])) + { + GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id), + "Peer not found"); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + peer = GST_peer_list[peer_id]; + if (GNUNET_YES == peer->is_remote) + { + struct ForwardedOperationContext *fopc; + + LOG_DEBUG ("Forwarding PEER_GET_CONFIG for peer: %u\n", peer_id); + fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); + GNUNET_SERVER_client_keep (client); + fopc->client = client; + fopc->operation_id = GNUNET_ntohll (msg->operation_id); + fopc->type = OP_PEER_INFO; + fopc->opc = + GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. + slave->controller, + fopc->operation_id, &msg->header, + &GST_forwarded_operation_reply_relay, + fopc); + fopc->timeout_task = + GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, + fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + LOG_DEBUG ("Received PEER_GET_CONFIG for peer: %u\n", peer_id); + config = + GNUNET_CONFIGURATION_serialize (GST_peer_list[peer_id]->details.local.cfg, + &c_size); + xc_size = GNUNET_TESTBED_compress_config_ (config, c_size, &xconfig); + GNUNET_free (config); + msize = + xc_size + + sizeof (struct GNUNET_TESTBED_PeerConfigurationInformationMessage); + reply = GNUNET_realloc (xconfig, msize); + (void) memmove (&reply[1], reply, xc_size); + reply->header.size = htons (msize); + reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION); + reply->peer_id = msg->peer_id; + reply->operation_id = msg->operation_id; + GNUNET_TESTING_peer_get_identity (GST_peer_list[peer_id]->details.local.peer, + &reply->peer_identity); + reply->config_size = htons ((uint16_t) c_size); + GST_queue_message (client, &reply->header); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Cleanup the context information created for managing a peer's service + * + * @param mctx the ManageServiceContext + */ +static void +cleanup_mctx (struct ManageServiceContext *mctx) +{ + mctx->expired = GNUNET_YES; + GNUNET_CONTAINER_DLL_remove (mctx_head, mctx_tail, mctx); + GNUNET_SERVER_client_drop (mctx->client); + GNUNET_ARM_disconnect_and_free (mctx->ah); + GNUNET_assert (0 < mctx->peer->reference_cnt); + mctx->peer->reference_cnt--; + if ( (GNUNET_YES == mctx->peer->destroy_flag) + && (0 == mctx->peer->reference_cnt) ) + GST_destroy_peer (mctx->peer); + GNUNET_free (mctx); +} + + +/** + * Frees the ManageServiceContext queue + */ +void +GST_free_mctxq () +{ + while (NULL != mctx_head) + cleanup_mctx (mctx_head); +} + + +/** + * Returns a string interpretation of 'rs' + * + * @param rs the request status from ARM + * @return a string interpretation of the request status + */ +static const char * +arm_req_string (enum GNUNET_ARM_RequestStatus rs) +{ + switch (rs) + { + case GNUNET_ARM_REQUEST_SENT_OK: + return _("Message was sent successfully"); + case GNUNET_ARM_REQUEST_CONFIGURATION_ERROR: + return _("Misconfiguration (can't connect to the ARM service)"); + case GNUNET_ARM_REQUEST_DISCONNECTED: + return _("We disconnected from ARM before we could send a request"); + case GNUNET_ARM_REQUEST_BUSY: + return _("ARM API is busy"); + case GNUNET_ARM_REQUEST_TOO_LONG: + return _("Request doesn't fit into a message"); + case GNUNET_ARM_REQUEST_TIMEOUT: + return _("Request timed out"); + } + return _("Unknown request status"); +} + + +/** + * Returns a string interpretation of the 'result' + * + * @param result the arm result + * @return a string interpretation + */ +static const char * +arm_ret_string (enum GNUNET_ARM_Result result) +{ + switch (result) + { + case GNUNET_ARM_RESULT_STOPPED: + return _("%s is stopped"); + case GNUNET_ARM_RESULT_STARTING: + return _("%s is starting"); + case GNUNET_ARM_RESULT_STOPPING: + return _("%s is stopping"); + case GNUNET_ARM_RESULT_IS_STARTING_ALREADY: + return _("%s is starting already"); + case GNUNET_ARM_RESULT_IS_STOPPING_ALREADY: + return _("%s is stopping already"); + case GNUNET_ARM_RESULT_IS_STARTED_ALREADY: + return _("%s is started already"); + case GNUNET_ARM_RESULT_IS_STOPPED_ALREADY: + return _("%s is stopped already"); + case GNUNET_ARM_RESULT_IS_NOT_KNOWN: + return _("%s service is not known to ARM"); + case GNUNET_ARM_RESULT_START_FAILED: + return _("%s service failed to start"); + case GNUNET_ARM_RESULT_IN_SHUTDOWN: + return _("%s service can't be started because ARM is shutting down"); + } + return _("%.s Unknown result code."); +} + + +/** + * Function called in response to a start/stop request. + * Will be called when request was not sent successfully, + * or when a reply comes. If the request was not sent successfully, + * 'rs' will indicate that, and 'service' and 'result' will be undefined. + * + * @param cls ManageServiceContext + * @param arm handle to the arm connection + * @param rs status of the request + * @param service service name + * @param result result of the operation + */ +static void +service_manage_result_cb (void *cls, struct GNUNET_ARM_Handle *arm, + enum GNUNET_ARM_RequestStatus rs, + const char *service, enum GNUNET_ARM_Result result) +{ + struct ManageServiceContext *mctx = cls; + char *emsg; + + emsg = NULL; + if (GNUNET_YES == mctx->expired) + return; + if (GNUNET_ARM_REQUEST_SENT_OK != rs) + { + GNUNET_asprintf (&emsg, "Error communicating with Peer %u's ARM: %s", + mctx->peer->id, arm_req_string (rs)); + goto ret; + } + if (1 == mctx->start) + goto service_start_check; + if (! ((GNUNET_ARM_RESULT_STOPPED == result) + || (GNUNET_ARM_RESULT_STOPPING == result) + || (GNUNET_ARM_RESULT_IS_STOPPING_ALREADY == result) + || (GNUNET_ARM_RESULT_IS_STOPPED_ALREADY == result)) ) + { + /* stopping a service failed */ + GNUNET_asprintf (&emsg, arm_ret_string (result), service); + goto ret; + } + /* service stopped successfully */ + goto ret; + + service_start_check: + if (! ((GNUNET_ARM_RESULT_STARTING == result) + || (GNUNET_ARM_RESULT_IS_STARTING_ALREADY == result) + || (GNUNET_ARM_RESULT_IS_STARTED_ALREADY == result)) ) + { + /* starting a service failed */ + GNUNET_asprintf (&emsg, arm_ret_string (result), service); + goto ret; + } + /* service started successfully */ + + ret: + if (NULL != emsg) + { + LOG_DEBUG ("%s\n", emsg); + GST_send_operation_fail_msg (mctx->client, mctx->op_id, emsg); + } + else + GST_send_operation_success_msg (mctx->client, mctx->op_id); + GNUNET_free_non_null (emsg); + cleanup_mctx (mctx); +} + + +/** + * Handler for GNUNET_TESTBED_ManagePeerServiceMessage message + * + * @param cls NULL + * @param client identification of client + * @param message the actual message + */ +void +GST_handle_manage_peer_service (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_ManagePeerServiceMessage *msg; + const char* service; + struct Peer *peer; + char *emsg; + struct GNUNET_ARM_Handle *ah; + struct ManageServiceContext *mctx; + struct ForwardedOperationContext *fopc; + uint64_t op_id; + uint32_t peer_id; + uint16_t msize; + + + msize = ntohs (message->size); + if (msize <= sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage)) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + msg = (const struct GNUNET_TESTBED_ManagePeerServiceMessage *) message; + service = (const char *) &msg[1]; + if ('\0' != service[msize - sizeof + (struct GNUNET_TESTBED_ManagePeerServiceMessage) - 1]) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + if (1 < msg->start) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + peer_id = ntohl (msg->peer_id); + op_id = GNUNET_ntohll (msg->operation_id); + LOG_DEBUG ("Received request to manage service %s on peer %u\n", + service, (unsigned int) peer_id); + if ((GST_peer_list_size <= peer_id) + || (NULL == (peer = GST_peer_list[peer_id]))) + { + GNUNET_asprintf (&emsg, "Asked to manage service of a non existent peer " + "with id: %u", peer_id); + goto err_ret; + } + if (0 == strcasecmp ("arm", service)) + { + emsg = GNUNET_strdup ("Cannot start/stop peer's ARM service. " + "Use peer start/stop for that"); + goto err_ret; + } + if (GNUNET_YES == peer->is_remote) + { + /* Forward the destory message to sub controller */ + fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); + GNUNET_SERVER_client_keep (client); + fopc->client = client; + fopc->cls = peer; + fopc->type = OP_MANAGE_SERVICE; + fopc->operation_id = op_id; + fopc->opc = + GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote. + slave->controller, + fopc->operation_id, &msg->header, + &GST_forwarded_operation_reply_relay, + fopc); + fopc->timeout_task = + GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout, + fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + if ((0 != peer->reference_cnt) + && ( (0 == strcasecmp ("core", service)) + || (0 == strcasecmp ("transport", service)) ) ) + { + GNUNET_asprintf (&emsg, "Cannot stop %s service of peer with id: %u " + "since it is required by existing operations", + service, peer_id); + goto err_ret; + } + ah = GNUNET_ARM_connect (peer->details.local.cfg, NULL, NULL); + if (NULL == ah) + { + GNUNET_asprintf (&emsg, + "Cannot connect to ARM service of peer with id: %u", + peer_id); + goto err_ret; + } + mctx = GNUNET_malloc (sizeof (struct ManageServiceContext)); + mctx->peer = peer; + peer->reference_cnt++; + mctx->op_id = op_id; + mctx->ah = ah; + GNUNET_SERVER_client_keep (client); + mctx->client = client; + mctx->start = msg->start; + GNUNET_CONTAINER_DLL_insert_tail (mctx_head, mctx_tail, mctx); + if (1 == mctx->start) + GNUNET_ARM_request_service_start (mctx->ah, service, + GNUNET_OS_INHERIT_STD_ERR, + GST_timeout, + service_manage_result_cb, + mctx); + else + GNUNET_ARM_request_service_stop (mctx->ah, service, + GST_timeout, + service_manage_result_cb, + mctx); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + + err_ret: + LOG (GNUNET_ERROR_TYPE_ERROR, "%s\n", emsg); + GST_send_operation_fail_msg (client, op_id, emsg); + GNUNET_free (emsg); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Stops and destroys all peers + */ +void +GST_destroy_peers () +{ + struct Peer *peer; + unsigned int id; + + if (NULL == GST_peer_list) + return; + for (id = 0; id < GST_peer_list_size; id++) + { + peer = GST_peer_list[id]; + if (NULL == peer) + continue; + /* If destroy flag is set it means that this peer should have been + * destroyed by a context which we destroy before */ + GNUNET_break (GNUNET_NO == peer->destroy_flag); + /* counter should be zero as we free all contexts before */ + GNUNET_break (0 == peer->reference_cnt); + if ((GNUNET_NO == peer->is_remote) && + (GNUNET_YES == peer->details.local.is_running)) + GNUNET_TESTING_peer_kill (peer->details.local.peer); + } + for (id = 0; id < GST_peer_list_size; id++) + { + peer = GST_peer_list[id]; + if (NULL == peer) + continue; + if (GNUNET_NO == peer->is_remote) + { + if (GNUNET_YES == peer->details.local.is_running) + GNUNET_TESTING_peer_wait (peer->details.local.peer); + GNUNET_TESTING_peer_destroy (peer->details.local.peer); + GNUNET_CONFIGURATION_destroy (peer->details.local.cfg); + } + GNUNET_free (peer); + } + GNUNET_free_non_null (GST_peer_list); + GST_peer_list = NULL; + GST_peer_list_size = 0; +} + + +/** + * Task run upon timeout of forwarded SHUTDOWN_PEERS operation + * + * @param cls the ForwardedOperationContext + * @param tc the scheduler task context + */ +static void +shutdown_peers_timeout_cb (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct ForwardedOperationContext *fo_ctxt = cls; + struct HandlerContext_ShutdownPeers *hc; + + fo_ctxt->timeout_task = GNUNET_SCHEDULER_NO_TASK; + hc = fo_ctxt->cls; + hc->timeout = GNUNET_YES; + GNUNET_assert (0 < hc->nslaves); + hc->nslaves--; + if (0 == hc->nslaves) + GST_send_operation_fail_msg (fo_ctxt->client, fo_ctxt->operation_id, + "Timeout at a slave controller"); + GNUNET_TESTBED_forward_operation_msg_cancel_ (fo_ctxt->opc); + GNUNET_SERVER_client_drop (fo_ctxt->client); + GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fo_ctxt); + GNUNET_free (fo_ctxt); +} + + +/** + * The reply msg handler forwarded SHUTDOWN_PEERS operation. Checks if a + * success reply is received from all clients and then sends the success message + * to the client + * + * @param cls ForwardedOperationContext + * @param msg the message to relay + */ +static void +shutdown_peers_reply_cb (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct ForwardedOperationContext *fo_ctxt = cls; + struct HandlerContext_ShutdownPeers *hc; + + hc = fo_ctxt->cls; + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != fo_ctxt->timeout_task); + GNUNET_SCHEDULER_cancel (fo_ctxt->timeout_task); + fo_ctxt->timeout_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (0 < hc->nslaves); + hc->nslaves--; + if (GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS != + ntohs (msg->type)) + hc->timeout = GNUNET_YES; + if (0 == hc->nslaves) + { + if (GNUNET_YES == hc->timeout) + GST_send_operation_fail_msg (fo_ctxt->client, fo_ctxt->operation_id, + "Timeout at a slave controller"); + else + GST_send_operation_success_msg (fo_ctxt->client, fo_ctxt->operation_id); + } + GNUNET_SERVER_client_drop (fo_ctxt->client); + GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fo_ctxt); + GNUNET_free (fo_ctxt); +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS messages + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_shutdown_peers (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_ShutdownPeersMessage *msg; + struct HandlerContext_ShutdownPeers *hc; + struct Slave *slave; + struct ForwardedOperationContext *fo_ctxt; + uint64_t op_id; + unsigned int cnt; + + msg = (const struct GNUNET_TESTBED_ShutdownPeersMessage *) message; + LOG_DEBUG ("Received SHUTDOWN_PEERS\n"); + /* Stop and destroy all peers */ + GST_free_mctxq (); + GST_free_occq (); + GST_free_roccq (); + GST_clear_fopcq (); + /* Forward to all slaves which we have started */ + op_id = GNUNET_ntohll (msg->operation_id); + hc = GNUNET_malloc (sizeof (struct HandlerContext_ShutdownPeers)); + /* FIXME: have a better implementation where we track which slaves are + started by this controller */ + for (cnt = 0; cnt < GST_slave_list_size; cnt++) + { + slave = GST_slave_list[cnt]; + if (NULL == slave) + continue; + if (NULL == slave->controller_proc) /* We didn't start the slave */ + continue; + LOG_DEBUG ("Forwarding SHUTDOWN_PEERS\n"); + hc->nslaves++; + fo_ctxt = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); + GNUNET_SERVER_client_keep (client); + fo_ctxt->client = client; + fo_ctxt->operation_id = op_id; + fo_ctxt->cls = hc; + fo_ctxt->type = OP_SHUTDOWN_PEERS; + fo_ctxt->opc = + GNUNET_TESTBED_forward_operation_msg_ (slave->controller, + fo_ctxt->operation_id, + &msg->header, + shutdown_peers_reply_cb, + fo_ctxt); + fo_ctxt->timeout_task = + GNUNET_SCHEDULER_add_delayed (GST_timeout, &shutdown_peers_timeout_cb, + fo_ctxt); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fo_ctxt); + } + LOG_DEBUG ("Shutting down peers\n"); + GST_destroy_peers (); + if (0 == hc->nslaves) + { + GST_send_operation_success_msg (client, op_id); + GNUNET_free (hc); + } + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} -- cgit v1.2.3