From 91a95b29f1c6f729a1db0de4eeb78648836ea671 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Sun, 21 Oct 2012 17:25:57 +0000 Subject: tracking forwarded operations --- src/testbed/gnunet-service-testbed.c | 93 +++++++++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 17 deletions(-) (limited to 'src/testbed') diff --git a/src/testbed/gnunet-service-testbed.c b/src/testbed/gnunet-service-testbed.c index b0334b898..ac0ff7562 100644 --- a/src/testbed/gnunet-service-testbed.c +++ b/src/testbed/gnunet-service-testbed.c @@ -665,6 +665,16 @@ struct RequestOverlayConnectContext */ struct ForwardedOperationContext { + /** + * The next pointer for DLL + */ + struct ForwardedOperationContext *next; + + /** + * The prev pointer for DLL + */ + struct ForwardedOperationContext *prev; + /** * The generated operation context */ @@ -690,6 +700,11 @@ struct ForwardedOperationContext */ uint64_t operation_id; + /** + * The type of the operation which is forwarded + */ + enum OperationType type; + }; @@ -825,6 +840,16 @@ static struct RequestOverlayConnectContext *roccq_head; */ static struct RequestOverlayConnectContext *roccq_tail; +/** + * DLL head for forwarded operation contexts + */ +static struct ForwardedOperationContext *fopcq_head; + +/** + * DLL tail for forwarded operation contexts + */ +static struct ForwardedOperationContext *fopcq_tail; + /** * Array of hosts */ @@ -1373,11 +1398,11 @@ forwarded_operation_reply_relay (void *cls, msize = ntohs (msg->size); LOG_DEBUG ("Relaying message with type: %u, size: %u\n", ntohs (msg->type), msize); - dup_msg = GNUNET_malloc (msize); - (void) memcpy (dup_msg, msg, msize); + dup_msg = GNUNET_copy_message (msg); queue_message (fopc->client, dup_msg); GNUNET_SERVER_client_drop (fopc->client); GNUNET_SCHEDULER_cancel (fopc->timeout_task); + GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fopc); GNUNET_free (fopc); } @@ -1398,6 +1423,7 @@ forwarded_operation_timeout (void *cls, LOG (GNUNET_ERROR_TYPE_WARNING, "A forwarded operation has timed out\n"); send_operation_fail_msg (fopc->client, fopc->operation_id, "Timeout"); GNUNET_SERVER_client_drop (fopc->client); + GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fopc); GNUNET_free (fopc); } @@ -1446,6 +1472,7 @@ lcf_forwarded_operation_timeout (void *cls, struct LCFContext *lcf = cls; GNUNET_assert (NULL != lcf->fopc); + lcf->fopc->timeout_task = GNUNET_SCHEDULER_NO_TASK; forwarded_operation_timeout (lcf->fopc, tc); lcf->fopc = NULL; GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == lcf_proc_task_id); @@ -1502,6 +1529,7 @@ lcf_proc_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) lcf->fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext)); lcf->fopc->client = lcf->client; lcf->fopc->operation_id = lcf->operation_id; + lcf->fopc->type = OP_LINK_CONTROLLERS; lcf->fopc->opc = GNUNET_TESTBED_forward_operation_msg_ (lcf->gateway->controller, lcf->operation_id, @@ -1511,6 +1539,7 @@ lcf_proc_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) lcf->fopc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &lcf_forwarded_operation_timeout, lcf); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, lcf->fopc); lcf->state = FINISHED; break; case FINISHED: @@ -1621,6 +1650,7 @@ process_next_focc (struct RegisteredHostContext *rhc) fopc->client = rhc->client; fopc->operation_id = focc->operation_id; fopc->cls = rhc; + fopc->type = OP_OVERLAY_CONNECT; fopc->opc = GNUNET_TESTBED_forward_operation_msg_ (rhc->gateway->controller, focc->operation_id, focc->orig_msg, @@ -1631,6 +1661,7 @@ process_next_focc (struct RegisteredHostContext *rhc) fopc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &forwarded_overlay_connect_timeout, fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); } @@ -2204,20 +2235,15 @@ static void peer_create_success_cb (void *cls, const struct GNUNET_MessageHeader *msg) { struct ForwardedOperationContext *fopc = cls; - struct GNUNET_MessageHeader *dup_msg; struct Peer *remote_peer; - GNUNET_SCHEDULER_cancel (fopc->timeout_task); if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS) { GNUNET_assert (NULL != fopc->cls); remote_peer = fopc->cls; peer_list_add (remote_peer); } - dup_msg = GNUNET_copy_message (msg); - queue_message (fopc->client, dup_msg); - GNUNET_SERVER_client_drop (fopc->client); - GNUNET_free (fopc); + forwarded_operation_reply_relay (fopc, msg); } @@ -2259,10 +2285,8 @@ static void peer_destroy_success_cb (void *cls, const struct GNUNET_MessageHeader *msg) { struct ForwardedOperationContext *fopc = cls; - struct GNUNET_MessageHeader *dup_msg; struct Peer *remote_peer; - GNUNET_SCHEDULER_cancel (fopc->timeout_task); if (GNUNET_MESSAGE_TYPE_TESTBED_GENERICOPSUCCESS == ntohs (msg->type)) { remote_peer = fopc->cls; @@ -2271,10 +2295,7 @@ peer_destroy_success_cb (void *cls, const struct GNUNET_MessageHeader *msg) if (0 == remote_peer->reference_cnt) destroy_peer (remote_peer); } - dup_msg = GNUNET_copy_message (msg); - queue_message (fopc->client, dup_msg); - GNUNET_SERVER_client_drop (fopc->client); - GNUNET_free (fopc); + forwarded_operation_reply_relay (fopc, msg); } @@ -2410,6 +2431,7 @@ handle_peer_create (void *cls, struct GNUNET_SERVER_Client *client, fo_ctxt->client = client; fo_ctxt->operation_id = GNUNET_ntohll (msg->operation_id); fo_ctxt->cls = peer; //slave_list[route->dest]->controller; + fo_ctxt->type = OP_PEER_CREATE; fo_ctxt->opc = GNUNET_TESTBED_forward_operation_msg_ (slave_list [route->dest]->controller, fo_ctxt->operation_id, @@ -2418,6 +2440,7 @@ handle_peer_create (void *cls, struct GNUNET_SERVER_Client *client, fo_ctxt->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &peer_create_forward_timeout, fo_ctxt); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fo_ctxt); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2459,6 +2482,7 @@ handle_peer_destroy (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_client_keep (client); fopc->client = client; fopc->cls = peer; + fopc->type = OP_PEER_DESTROY; fopc->operation_id = GNUNET_ntohll (msg->operation_id); fopc->opc = GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.slave->controller, @@ -2468,6 +2492,7 @@ handle_peer_destroy (void *cls, struct GNUNET_SERVER_Client *client, fopc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &forwarded_operation_timeout, fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } @@ -2516,6 +2541,7 @@ handle_peer_start (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_client_keep (client); fopc->client = client; fopc->operation_id = GNUNET_ntohll (msg->operation_id); + fopc->type = OP_PEER_START; fopc->opc = GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.slave->controller, fopc->operation_id, &msg->header, @@ -2524,6 +2550,7 @@ handle_peer_start (void *cls, struct GNUNET_SERVER_Client *client, fopc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &forwarded_operation_timeout, fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } @@ -2580,6 +2607,7 @@ handle_peer_stop (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_client_keep (client); fopc->client = client; fopc->operation_id = GNUNET_ntohll (msg->operation_id); + fopc->type = OP_PEER_STOP; fopc->opc = GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.slave->controller, fopc->operation_id, &msg->header, @@ -2588,6 +2616,7 @@ handle_peer_stop (void *cls, struct GNUNET_SERVER_Client *client, fopc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &forwarded_operation_timeout, fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } @@ -2650,6 +2679,7 @@ handle_peer_get_config (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_client_keep (client); fopc->client = client; fopc->operation_id = GNUNET_ntohll (msg->operation_id); + fopc->type = OP_PEER_INFO; fopc->opc = GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.slave->controller, fopc->operation_id, &msg->header, @@ -2657,7 +2687,8 @@ handle_peer_get_config (void *cls, struct GNUNET_SERVER_Client *client, fopc); fopc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &forwarded_operation_timeout, - fopc); + fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } @@ -3273,6 +3304,7 @@ handle_overlay_connect (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_client_keep (client); fopc->client = client; fopc->operation_id = operation_id; + fopc->type = OP_OVERLAY_CONNECT; fopc->opc = GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.slave->controller, operation_id, message, @@ -3281,6 +3313,7 @@ handle_overlay_connect (void *cls, struct GNUNET_SERVER_Client *client, fopc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &forwarded_operation_timeout, fopc); + GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } @@ -3566,6 +3599,7 @@ handle_slave_get_config (void *cls, struct GNUNET_SERVER_Client *client, op_id = GNUNET_ntohll (msg->operation_id); if ((slave_list_size <= slave_id) || (NULL == slave_list[slave_id])) { + /* FIXME: Add forwardings for this type of message here.. */ send_operation_fail_msg (client, op_id, "Slave not found"); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; @@ -3670,6 +3704,7 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct LCFContextQueue *lcfq; struct OverlayConnectContext *occ; struct RequestOverlayConnectContext *rocc; + struct ForwardedOperationContext *fopc; uint32_t id; shutdown_task_id = GNUNET_SCHEDULER_NO_TASK; @@ -3677,6 +3712,32 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) (void) GNUNET_CONTAINER_multihashmap_iterate (ss_map, &ss_map_free_iterator, NULL); GNUNET_CONTAINER_multihashmap_destroy (ss_map); + /* cleanup any remaining forwarded operations */ + while (NULL != (fopc = fopcq_head)) + { + GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fopc); + GNUNET_TESTBED_forward_operation_msg_cancel_ (fopc->opc); + if (GNUNET_SCHEDULER_NO_TASK != fopc->timeout_task) + GNUNET_SCHEDULER_cancel (fopc->timeout_task); + GNUNET_SERVER_client_drop (fopc->client); + switch (fopc->type) + { + case OP_PEER_CREATE: + GNUNET_free (fopc->cls); + break; + case OP_PEER_START: + case OP_PEER_STOP: + case OP_PEER_DESTROY: + case OP_PEER_INFO: + case OP_OVERLAY_CONNECT: + case OP_LINK_CONTROLLERS: + case OP_GET_SLAVE_CONFIG: + break; + case OP_FORWARDED: + GNUNET_assert (0); + }; + GNUNET_free (fopc); + } if (NULL != lcfq_head) { if (GNUNET_SCHEDULER_NO_TASK != lcf_proc_task_id) @@ -3688,8 +3749,6 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == lcf_proc_task_id); for (lcfq = lcfq_head; NULL != lcfq; lcfq = lcfq_head) { - if (NULL != lcfq->lcf->fopc) - GNUNET_TESTBED_forward_operation_msg_cancel_ (lcfq->lcf->fopc->opc); GNUNET_free (lcfq->lcf->msg); GNUNET_free (lcfq->lcf); GNUNET_CONTAINER_DLL_remove (lcfq_head, lcfq_tail, lcfq); -- cgit v1.2.3