From 9335b7094d8a11449c322b2ca3f5a6cb7a257dad Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 27 Jun 2016 16:52:08 +0000 Subject: -use more of MQ API --- src/testbed/gnunet-service-testbed_oc.c | 1 - src/testbed/testbed_api.c | 100 +++++++++++++++++------------- src/testbed/testbed_api_peers.c | 104 +++++++++++++++++++------------- 3 files changed, 119 insertions(+), 86 deletions(-) (limited to 'src/testbed') diff --git a/src/testbed/gnunet-service-testbed_oc.c b/src/testbed/gnunet-service-testbed_oc.c index 44f408764..de462da7a 100644 --- a/src/testbed/gnunet-service-testbed_oc.c +++ b/src/testbed/gnunet-service-testbed_oc.c @@ -1722,7 +1722,6 @@ rocc_hello_sent_cb (void *cls) GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY, &occ_cache_get_handle_ats_rocc_cb, rocc, NULL, NULL, NULL); - } diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c index aad5055ef..b74b48b69 100644 --- a/src/testbed/testbed_api.c +++ b/src/testbed/testbed_api.c @@ -1288,7 +1288,6 @@ handle_barrier_status (void *cls, * * @param controller the handle to the controller * @param msg the message to queue - * @deprecated */ void GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, @@ -1328,17 +1327,27 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, * operation */ struct OperationContext * -GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller - *controller, uint64_t operation_id, +GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller *controller, + uint64_t operation_id, const struct GNUNET_MessageHeader *msg, GNUNET_CLIENT_MessageHandler cc, void *cc_cls) { struct OperationContext *opc; struct ForwardedOperationData *data; - struct GNUNET_MessageHeader *dup_msg; - uint16_t msize; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m2; + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + env = GNUNET_MQ_msg_extra (m2, + size - sizeof (*m2), + type); + memcpy (m2, + msg, + size); + GNUNET_MQ_send (controller->mq, + env); data = GNUNET_new (struct ForwardedOperationData); data->cc = cc; data->cc_cls = cc_cls; @@ -1347,11 +1356,8 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller opc->type = OP_FORWARDED; opc->data = data; opc->id = operation_id; - msize = ntohs (msg->size); - dup_msg = GNUNET_malloc (msize); - (void) memcpy (dup_msg, msg, msize); - GNUNET_TESTBED_queue_message_ (opc->c, dup_msg); - GNUNET_TESTBED_insert_opc_ (controller, opc); + GNUNET_TESTBED_insert_opc_ (controller, + opc); return opc; } @@ -1365,7 +1371,8 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller void GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc) { - GNUNET_TESTBED_remove_opc_ (opc->c, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, + opc); GNUNET_free (opc->data); GNUNET_free (opc); } @@ -1561,11 +1568,13 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, GNUNET_MQ_handler_end () }; struct GNUNET_TESTBED_InitMessage *msg; + struct GNUNET_MQ_Envelope *env; const struct GNUNET_CONFIGURATION_Handle *cfg; const char *controller_hostname; unsigned long long max_parallel_operations; unsigned long long max_parallel_service_connections; unsigned long long max_parallel_topology_config_operations; + size_t slen; GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host))); if (GNUNET_OK != @@ -1626,18 +1635,17 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, controller_hostname = GNUNET_TESTBED_host_get_hostname (host); if (NULL == controller_hostname) controller_hostname = "127.0.0.1"; - msg = - GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); + slen = strlen (controller_hostname) + 1; + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_INIT); msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host)); msg->event_mask = GNUNET_htonll (controller->event_mask); - strcpy ((char *) &msg[1], controller_hostname); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); + memcpy (&msg[1], + controller_hostname, + slen); + GNUNET_MQ_send (controller->mq, + env); return controller; } @@ -2182,16 +2190,17 @@ static void opstart_shutdown_peers (void *cls) { struct OperationContext *opc = cls; + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_ShutdownPeersMessage *msg; opc->state = OPC_STATE_STARTED; - msg = GNUNET_new (struct GNUNET_TESTBED_ShutdownPeersMessage); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS); msg->operation_id = GNUNET_htonll (opc->id); - GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); + GNUNET_TESTBED_insert_opc_ (opc->c, + opc); + GNUNET_MQ_send (opc->c->mq, + env); } @@ -2330,10 +2339,10 @@ GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller, int echo) { struct GNUNET_TESTBED_BarrierInit *msg; + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_Barrier *barrier; struct GNUNET_HashCode key; size_t name_len; - uint16_t msize; GNUNET_assert (quorum <= 100); GNUNET_assert (NULL != cb); @@ -2362,13 +2371,16 @@ GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller, &barrier->key, barrier, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); - msize = name_len + sizeof (struct GNUNET_TESTBED_BarrierInit); - msg = GNUNET_malloc (msize); - msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT); + + env = GNUNET_MQ_msg_extra (msg, + name_len, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT); msg->quorum = (uint8_t) quorum; - (void) memcpy (msg->name, barrier->name, name_len); - GNUNET_TESTBED_queue_message_ (barrier->c, &msg->header); + memcpy (msg->name, + barrier->name, + name_len); + GNUNET_MQ_send (barrier->c->mq, + env); return barrier; } @@ -2406,15 +2418,19 @@ GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller, void GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier) { + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_BarrierCancel *msg; - uint16_t msize; - - msize = sizeof (struct GNUNET_TESTBED_BarrierCancel) + strlen (barrier->name); - msg = GNUNET_malloc (msize); - msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL); - (void) memcpy (msg->name, barrier->name, strlen (barrier->name)); - GNUNET_TESTBED_queue_message_ (barrier->c, &msg->header); + size_t slen; + + slen = strlen (barrier->name); + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL); + memcpy (msg->name, + barrier->name, + slen); + GNUNET_MQ_send (barrier->c->mq, + env); GNUNET_TESTBED_barrier_remove_ (barrier); } diff --git a/src/testbed/testbed_api_peers.c b/src/testbed/testbed_api_peers.c index 884d4ffca..c30f960c8 100644 --- a/src/testbed/testbed_api_peers.c +++ b/src/testbed/testbed_api_peers.c @@ -99,30 +99,36 @@ opstart_peer_create (void *cls) struct OperationContext *opc = cls; struct PeerCreateData *data = opc->data; struct GNUNET_TESTBED_PeerCreateMessage *msg; + struct GNUNET_MQ_Envelope *env; char *config; char *xconfig; size_t c_size; size_t xc_size; - uint16_t msize; GNUNET_assert (OP_PEER_CREATE == opc->type); GNUNET_assert (NULL != data); GNUNET_assert (NULL != data->peer); opc->state = OPC_STATE_STARTED; - config = GNUNET_CONFIGURATION_serialize (data->cfg, &c_size); - xc_size = GNUNET_TESTBED_compress_config_ (config, c_size, &xconfig); + config = GNUNET_CONFIGURATION_serialize (data->cfg, + &c_size); + xc_size = GNUNET_TESTBED_compress_config_ (config, + c_size, + &xconfig); GNUNET_free (config); - msize = xc_size + sizeof (struct GNUNET_TESTBED_PeerCreateMessage); - msg = GNUNET_realloc (xconfig, msize); - memmove (&msg[1], msg, xc_size); - msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER); + env = GNUNET_MQ_msg_extra (msg, + xc_size, + GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER); msg->operation_id = GNUNET_htonll (opc->id); msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (data->peer->host)); msg->peer_id = htonl (data->peer->unique_id); msg->config_size = htons ((uint16_t) c_size); + memcpy (&msg[1], + xconfig, + xc_size); + GNUNET_MQ_send (opc->c->mq, + env); + GNUNET_free (xconfig); GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); } @@ -163,17 +169,18 @@ opstart_peer_destroy (void *cls) struct OperationContext *opc = cls; struct GNUNET_TESTBED_Peer *peer = opc->data; struct GNUNET_TESTBED_PeerDestroyMessage *msg; + struct GNUNET_MQ_Envelope *env; GNUNET_assert (OP_PEER_DESTROY == opc->type); GNUNET_assert (NULL != peer); opc->state = OPC_STATE_STARTED; - msg = GNUNET_new (struct GNUNET_TESTBED_PeerDestroyMessage); - msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerDestroyMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_DESTROY_PEER); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_DESTROY_PEER); msg->peer_id = htonl (peer->unique_id); msg->operation_id = GNUNET_htonll (opc->id); GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header); + GNUNET_MQ_send (peer->controller->mq, + env); } @@ -211,6 +218,7 @@ opstart_peer_start (void *cls) { struct OperationContext *opc = cls; struct GNUNET_TESTBED_PeerStartMessage *msg; + struct GNUNET_MQ_Envelope *env; struct PeerEventData *data; struct GNUNET_TESTBED_Peer *peer; @@ -219,13 +227,13 @@ opstart_peer_start (void *cls) GNUNET_assert (NULL != (peer = data->peer)); GNUNET_assert ((TESTBED_PS_CREATED == peer->state) || (TESTBED_PS_STOPPED == peer->state)); opc->state = OPC_STATE_STARTED; - msg = GNUNET_new (struct GNUNET_TESTBED_PeerStartMessage); - msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerStartMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_START_PEER); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_START_PEER); msg->peer_id = htonl (peer->unique_id); msg->operation_id = GNUNET_htonll (opc->id); GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header); + GNUNET_MQ_send (peer->controller->mq, + env); } @@ -266,18 +274,19 @@ opstart_peer_stop (void *cls) struct GNUNET_TESTBED_PeerStopMessage *msg; struct PeerEventData *data; struct GNUNET_TESTBED_Peer *peer; + struct GNUNET_MQ_Envelope *env; GNUNET_assert (NULL != (data = opc->data)); GNUNET_assert (NULL != (peer = data->peer)); GNUNET_assert (TESTBED_PS_STARTED == peer->state); opc->state = OPC_STATE_STARTED; - msg = GNUNET_new (struct GNUNET_TESTBED_PeerStopMessage); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_STOP_PEER); - msg->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerStopMessage)); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_STOP_PEER); msg->peer_id = htonl (peer->unique_id); msg->operation_id = GNUNET_htonll (opc->id); GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (peer->controller, &msg->header); + GNUNET_MQ_send (peer->controller->mq, + env); } @@ -404,22 +413,23 @@ static void opstart_overlay_connect (void *cls) { struct OperationContext *opc = cls; + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_OverlayConnectMessage *msg; struct OverlayConnectData *data; opc->state = OPC_STATE_STARTED; data = opc->data; GNUNET_assert (NULL != data); - msg = GNUNET_new (struct GNUNET_TESTBED_OverlayConnectMessage); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_OVERLAY_CONNECT); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_OVERLAY_CONNECT); msg->peer1 = htonl (data->p1->unique_id); msg->peer2 = htonl (data->p2->unique_id); msg->operation_id = GNUNET_htonll (opc->id); msg->peer2_host_id = htonl (GNUNET_TESTBED_host_get_id_ (data->p2->host)); - GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); + GNUNET_TESTBED_insert_opc_ (opc->c, + opc); + GNUNET_MQ_send (opc->c->mq, + env); } @@ -460,31 +470,34 @@ opstart_peer_reconfigure (void *cls) { struct OperationContext *opc = cls; struct PeerReconfigureData *data = opc->data; + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_PeerReconfigureMessage *msg; char *xconfig; size_t xc_size; - uint16_t msize; opc->state = OPC_STATE_STARTED; GNUNET_assert (NULL != data); - xc_size = GNUNET_TESTBED_compress_config_ (data->config, data->cfg_size, + xc_size = GNUNET_TESTBED_compress_config_ (data->config, + data->cfg_size, &xconfig); GNUNET_free (data->config); data->config = NULL; - GNUNET_assert (xc_size <= UINT16_MAX); - msize = (uint16_t) xc_size + - sizeof (struct GNUNET_TESTBED_PeerReconfigureMessage); - msg = GNUNET_realloc (xconfig, msize); - (void) memmove (&msg[1], msg, xc_size); - msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_RECONFIGURE_PEER); + GNUNET_assert (xc_size < UINT16_MAX - sizeof (*msg)); + env = GNUNET_MQ_msg_extra (msg, + xc_size, + GNUNET_MESSAGE_TYPE_TESTBED_RECONFIGURE_PEER); msg->peer_id = htonl (data->peer->unique_id); msg->operation_id = GNUNET_htonll (opc->id); msg->config_size = htons (data->cfg_size); + memcpy (&msg[1], + xconfig, + xc_size); + GNUNET_free (xconfig); GNUNET_free (data); opc->data = NULL; GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); + GNUNET_MQ_send (opc->c->mq, + env); } @@ -873,22 +886,27 @@ opstart_manage_service (void *cls) { struct OperationContext *opc = cls; struct ManageServiceData *data = opc->data; + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_ManagePeerServiceMessage *msg; + size_t xlen; GNUNET_assert (NULL != data); - msg = GNUNET_malloc (data->msize); - msg->header.size = htons (data->msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_MANAGE_PEER_SERVICE); + xlen = data->msize - sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage); + env = GNUNET_MQ_msg_extra (msg, + xlen, + GNUNET_MESSAGE_TYPE_TESTBED_MANAGE_PEER_SERVICE); msg->peer_id = htonl (data->peer->unique_id); msg->operation_id = GNUNET_htonll (opc->id); msg->start = (uint8_t) data->start; - (void) memcpy (&msg[1], data->service_name, data->msize - - sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage)); + memcpy (&msg[1], + data->service_name, + xlen); GNUNET_free (data->service_name); data->service_name = NULL; opc->state = OPC_STATE_STARTED; GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); + GNUNET_MQ_send (opc->c->mq, + env); } -- cgit v1.2.3