From 9335b7094d8a11449c322b2ca3f5a6cb7a257dad Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 27 Jun 2016 16:52:08 +0000 Subject: -use more of MQ API --- src/testbed/testbed_api.c | 100 +++++++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 42 deletions(-) (limited to 'src/testbed/testbed_api.c') diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c index aad5055ef..b74b48b69 100644 --- a/src/testbed/testbed_api.c +++ b/src/testbed/testbed_api.c @@ -1288,7 +1288,6 @@ handle_barrier_status (void *cls, * * @param controller the handle to the controller * @param msg the message to queue - * @deprecated */ void GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, @@ -1328,17 +1327,27 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, * operation */ struct OperationContext * -GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller - *controller, uint64_t operation_id, +GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller *controller, + uint64_t operation_id, const struct GNUNET_MessageHeader *msg, GNUNET_CLIENT_MessageHandler cc, void *cc_cls) { struct OperationContext *opc; struct ForwardedOperationData *data; - struct GNUNET_MessageHeader *dup_msg; - uint16_t msize; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m2; + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + env = GNUNET_MQ_msg_extra (m2, + size - sizeof (*m2), + type); + memcpy (m2, + msg, + size); + GNUNET_MQ_send (controller->mq, + env); data = GNUNET_new (struct ForwardedOperationData); data->cc = cc; data->cc_cls = cc_cls; @@ -1347,11 +1356,8 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller opc->type = OP_FORWARDED; opc->data = data; opc->id = operation_id; - msize = ntohs (msg->size); - dup_msg = GNUNET_malloc (msize); - (void) memcpy (dup_msg, msg, msize); - GNUNET_TESTBED_queue_message_ (opc->c, dup_msg); - GNUNET_TESTBED_insert_opc_ (controller, opc); + GNUNET_TESTBED_insert_opc_ (controller, + opc); return opc; } @@ -1365,7 +1371,8 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller void GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc) { - GNUNET_TESTBED_remove_opc_ (opc->c, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, + opc); GNUNET_free (opc->data); GNUNET_free (opc); } @@ -1561,11 +1568,13 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, GNUNET_MQ_handler_end () }; struct GNUNET_TESTBED_InitMessage *msg; + struct GNUNET_MQ_Envelope *env; const struct GNUNET_CONFIGURATION_Handle *cfg; const char *controller_hostname; unsigned long long max_parallel_operations; unsigned long long max_parallel_service_connections; unsigned long long max_parallel_topology_config_operations; + size_t slen; GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host))); if (GNUNET_OK != @@ -1626,18 +1635,17 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, controller_hostname = GNUNET_TESTBED_host_get_hostname (host); if (NULL == controller_hostname) controller_hostname = "127.0.0.1"; - msg = - GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); + slen = strlen (controller_hostname) + 1; + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_INIT); msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host)); msg->event_mask = GNUNET_htonll (controller->event_mask); - strcpy ((char *) &msg[1], controller_hostname); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); + memcpy (&msg[1], + controller_hostname, + slen); + GNUNET_MQ_send (controller->mq, + env); return controller; } @@ -2182,16 +2190,17 @@ static void opstart_shutdown_peers (void *cls) { struct OperationContext *opc = cls; + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_ShutdownPeersMessage *msg; opc->state = OPC_STATE_STARTED; - msg = GNUNET_new (struct GNUNET_TESTBED_ShutdownPeersMessage); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS); msg->operation_id = GNUNET_htonll (opc->id); - GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); + GNUNET_TESTBED_insert_opc_ (opc->c, + opc); + GNUNET_MQ_send (opc->c->mq, + env); } @@ -2330,10 +2339,10 @@ GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller, int echo) { struct GNUNET_TESTBED_BarrierInit *msg; + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_Barrier *barrier; struct GNUNET_HashCode key; size_t name_len; - uint16_t msize; GNUNET_assert (quorum <= 100); GNUNET_assert (NULL != cb); @@ -2362,13 +2371,16 @@ GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller, &barrier->key, barrier, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); - msize = name_len + sizeof (struct GNUNET_TESTBED_BarrierInit); - msg = GNUNET_malloc (msize); - msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT); + + env = GNUNET_MQ_msg_extra (msg, + name_len, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT); msg->quorum = (uint8_t) quorum; - (void) memcpy (msg->name, barrier->name, name_len); - GNUNET_TESTBED_queue_message_ (barrier->c, &msg->header); + memcpy (msg->name, + barrier->name, + name_len); + GNUNET_MQ_send (barrier->c->mq, + env); return barrier; } @@ -2406,15 +2418,19 @@ GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller, void GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier) { + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_BarrierCancel *msg; - uint16_t msize; - - msize = sizeof (struct GNUNET_TESTBED_BarrierCancel) + strlen (barrier->name); - msg = GNUNET_malloc (msize); - msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL); - (void) memcpy (msg->name, barrier->name, strlen (barrier->name)); - GNUNET_TESTBED_queue_message_ (barrier->c, &msg->header); + size_t slen; + + slen = strlen (barrier->name); + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL); + memcpy (msg->name, + barrier->name, + slen); + GNUNET_MQ_send (barrier->c->mq, + env); GNUNET_TESTBED_barrier_remove_ (barrier); } -- cgit v1.2.3