From 719fb81dbaf034f624abaf6cc0a51c05446191f4 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Mon, 2 Sep 2013 10:01:01 +0000 Subject: - more barrier stuff --- src/testbed/gnunet-service-testbed_barriers.c | 160 ++++++++++++++++++++------ src/testbed/testbed.h | 9 +- src/testbed/testbed_api.c | 2 +- src/testbed/testbed_api.h | 2 +- src/testbed/testbed_api_barriers.c | 12 +- 5 files changed, 141 insertions(+), 44 deletions(-) (limited to 'src') diff --git a/src/testbed/gnunet-service-testbed_barriers.c b/src/testbed/gnunet-service-testbed_barriers.c index c12075d66..e0d0e2f18 100644 --- a/src/testbed/gnunet-service-testbed_barriers.c +++ b/src/testbed/gnunet-service-testbed_barriers.c @@ -152,6 +152,11 @@ struct Barrier */ struct GNUNET_HashCode hash; + /** + * The client handle to the master controller + */ + struct GNUNET_SERVER_Client *client; + /** * The name of the barrier */ @@ -177,6 +182,16 @@ struct Barrier */ struct WBarrier *wtail; + /** + * Identifier for the timeout task + */ + GNUNET_SCHEDULER_TaskIdentifier tout_task; + + /** + * The status of this barrier + */ + enum GNUNET_TESTBED_BarrierStatus status; + /** * Number of barriers wrapped in the above DLL */ @@ -187,6 +202,11 @@ struct Barrier */ unsigned int num_wbarriers_reached; + /** + * Number of wrapped barrier initialised so far + */ + unsigned int num_wbarriers_inited; + /** * Number of peers which have reached this barrier */ @@ -287,7 +307,6 @@ queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg) } -#if 0 /** * Function to remove a barrier from the barrier map and cleanup resources * occupied by a barrier @@ -301,33 +320,63 @@ remove_barrier (struct Barrier *barrier) &barrier->hash, barrier)); GNUNET_free (barrier->name); + if (NULL != barrier->client) + GNUNET_SERVER_client_drop (barrier->client); GNUNET_free (barrier); } /** - * Function called upon timeout while waiting for a response from the - * subcontrollers to barrier init message + * Cancels all subcontroller barrier handles * - * @param - * @return + * @param barrier the local barrier */ static void -fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +cancel_wrappers (struct Barrier *barrier) { - struct ForwardedOperationContext *foctx = cls; - struct Barrier *barrier = foctx->cls; - - barrier->nslaves--; - barrier->timedout = GNUNET_YES; - if (0 == barrier->nslaves) + struct WBarrier *wrapper; + + while (NULL != (wrapper = barrier->whead)) { - GST_send_operation_fail_msg (foctx->client, foctx->operation_id, - "Timeout while contacting a slave controller"); - remove_barrier (barrier); + GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier); + GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper); + GNUNET_free (wrapper); } } -#endif + + +/** + * Sends a barrier failed message + * + * @param barrier the corresponding barrier + * @param status the status of the barrier + * @param emsg the error message; should be non-NULL for + * status=BARRIER_STATUS_ERROR + */ +static void +send_barrier_status_msg (struct Barrier *barrier, + enum GNUNET_TESTBED_BarrierStatus status, + const char *emsg) +{ + struct GNUNET_TESTBED_BarrierStatusMsg *msg; + size_t name_len; + uint16_t msize; + + GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status)); + name_len = strlen (barrier->name) + 1; + msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + + name_len + + (NULL == emsg) ? 0 : strlen (emsg) + 1; + msg = GNUNET_malloc (msize); + msg->status = htons (status); + msg->name_len = htons (name_len); + (void) memcpy (msg->data, barrier->name, name_len); + if (NULL != emsg) + (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1); + GST_queue_message (barrier->client, &msg->header); +} + + /** * Task for sending barrier crossed notifications to waiting client @@ -340,13 +389,13 @@ notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Barrier *barrier = cls; struct ClientCtx *client_ctx; - struct GNUNET_TESTBED_BarrierStatus *msg; + struct GNUNET_TESTBED_BarrierStatusMsg *msg; struct GNUNET_MessageHeader *dup_msg; uint16_t name_len; uint16_t msize; name_len = strlen (barrier->name) + 1; - msize = sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len; + msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len; msg = GNUNET_malloc (msize); msg->header.size = htons (msize); msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS); @@ -505,8 +554,9 @@ GST_barriers_stop () */ static void wbarrier_status_cb (void *cls, const char *name, - struct GNUNET_TESTBED_Barrier *b_, - int status, const char *emsg) + struct GNUNET_TESTBED_Barrier *b_, + enum GNUNET_TESTBED_BarrierStatus status, + const char *emsg) { struct WBarrier *wrapper = cls; struct Barrier *barrier = wrapper->barrier; @@ -515,31 +565,58 @@ wbarrier_status_cb (void *cls, const char *name, wrapper->hbarrier = NULL; GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper); GNUNET_free (wrapper); - if (GNUNET_SYSERR == status) + if (BARRIER_STATUS_ERROR == status) { LOG (GNUNET_ERROR_TYPE_ERROR, - "Initialising barrier (%s) failed at a sub-controller: %s\n", + "Initialising barrier `%s' failed at a sub-controller: %s\n", barrier->name, (NULL != emsg) ? emsg : "NULL"); - while (NULL != (wrapper = barrier->whead)) - { - GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier); - GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper); - GNUNET_free (wrapper); - } - /* Send parent controller failure message */ - GNUNET_break (0); + cancel_wrappers (barrier); + if (NULL == emsg) + emsg = "Initialisation failed at a sub-controller"; + send_barrier_status_msg (barrier, BARRIER_STATUS_ERROR, emsg); + return; } - barrier->num_wbarriers_reached++; - if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) - && (LOCAL_QUORUM_REACHED (barrier))) + switch (status) { - /* Send parent controller success status message */ - GNUNET_break (0); + case BARRIER_STATUS_CROSSED: + barrier->num_wbarriers_reached++; + if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) + && (LOCAL_QUORUM_REACHED (barrier))) + send_barrier_status_msg (barrier, BARRIER_STATUS_CROSSED, NULL); + break; + case BARRIER_STATUS_INITIALISED: + barrier->num_wbarriers_inited++; + if (barrier->num_wbarriers_inited == barrier->num_wbarriers) + send_barrier_status_msg (barrier, BARRIER_STATUS_INITIALISED, NULL); + break; + case BARRIER_STATUS_ERROR: + GNUNET_assert (0); } return; } +/** + * Function called upon timeout while waiting for a response from the + * subcontrollers to barrier init message + * + * @param + * @return + */ +static void +fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Barrier *barrier = cls; + + barrier->nslaves--; + barrier->timedout = GNUNET_YES; + cancel_wrappers (barrier); + send_barrier_status_msg (barrier, BARRIER_STATUS_ERROR, + "Timedout while propagating barrier initialisation\n"); + remove_barrier (barrier); +} + + /** * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This * message should always come from a parent controller or the testbed API if we @@ -603,6 +680,8 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, barrier->name = GNUNET_malloc (name_len + 1); barrier->name[name_len] = '\0'; (void) memcpy (barrier->name, name, name_len); + barrier->client = client; + GNUNET_SERVER_client_keep (client); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (barrier_map, &barrier->hash, @@ -621,10 +700,19 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, } wrapper = GNUNET_malloc (sizeof (struct WBarrier)); wrapper->barrier = barrier; + GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper); wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller, barrier->name, barrier->quorum, &wbarrier_status_cb, - wrapper); + wrapper); } + if (NULL == barrier->whead) /* No further propagation */ + send_barrier_status_msg (barrier, BARRIER_STATUS_INITIALISED, NULL); + else + barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30), + &fwd_tout_barrier_init, + barrier); } + +/* end of gnunet-service-testbed_barriers.c */ diff --git a/src/testbed/testbed.h b/src/testbed/testbed.h index 3b159591d..73cf8c294 100644 --- a/src/testbed/testbed.h +++ b/src/testbed/testbed.h @@ -768,6 +768,11 @@ struct GNUNET_TESTBED_ManagePeerServiceMessage }; +/**************************************/ +/* Barriers IPC messages and protocol */ +/**************************************/ + + /** * Message to initialise a barrier */ @@ -803,7 +808,7 @@ struct GNUNET_TESTBED_BarrierInit /** * Message for signalling status changes of a barrier */ -struct GNUNET_TESTBED_BarrierStatus +struct GNUNET_TESTBED_BarrierStatusMsg { /** * Type is GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS @@ -811,7 +816,7 @@ struct GNUNET_TESTBED_BarrierStatus struct GNUNET_MessageHeader header; /** - * status. 0 to signal success (barrier is crossed). 1 for error. + * status. Use enumerated values of enum BarrierStatus */ uint16_t status; diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c index 0a109aa7f..d5486b1c3 100644 --- a/src/testbed/testbed_api.c +++ b/src/testbed/testbed_api.c @@ -1163,7 +1163,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) status = GNUNET_TESTBED_handle_barrier_status_ (c, (const struct - GNUNET_TESTBED_BarrierStatus *) + GNUNET_TESTBED_BarrierStatusMsg *) msg); break; default: diff --git a/src/testbed/testbed_api.h b/src/testbed/testbed_api.h index f658c08dd..39a2f711a 100644 --- a/src/testbed/testbed_api.h +++ b/src/testbed/testbed_api.h @@ -508,7 +508,7 @@ GNUNET_TESTBED_controller_link_ (void *op_cls, */ int GNUNET_TESTBED_handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c, - const struct GNUNET_TESTBED_BarrierStatus + const struct GNUNET_TESTBED_BarrierStatusMsg *msg); diff --git a/src/testbed/testbed_api_barriers.c b/src/testbed/testbed_api_barriers.c index 231e00600..ab468a088 100644 --- a/src/testbed/testbed_api_barriers.c +++ b/src/testbed/testbed_api_barriers.c @@ -102,7 +102,7 @@ barrier_remove (struct GNUNET_TESTBED_Barrier *barrier) */ int GNUNET_TESTBED_handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c, - const struct GNUNET_TESTBED_BarrierStatus + const struct GNUNET_TESTBED_BarrierStatusMsg *msg) { struct GNUNET_TESTBED_Barrier *barrier; @@ -119,17 +119,17 @@ GNUNET_TESTBED_handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c, msize = ntohs (msg->header.size); name = msg->data; name_len = ntohs (msg->name_len); - if ( (sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len + 1 > msize) + if ( (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1 > msize) || ('\0' != name[name_len]) ) { GNUNET_break_op (0); return GNUNET_SYSERR; } status = ntohs (msg->status); - if (0 != status) + if (BARRIER_STATUS_ERROR == status) { status = -1; - emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len + emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1); if (0 == emsg_len) { @@ -150,6 +150,8 @@ GNUNET_TESTBED_handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c, goto cleanup; GNUNET_assert (NULL != barrier->cb); barrier->cb (barrier->cls, name, barrier, status, emsg); + if (BARRIER_STATUS_INITIALISED == status) + return GNUNET_OK; /* just initialised; skip cleanup */ cleanup: GNUNET_free_non_null (emsg); @@ -219,3 +221,5 @@ GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier) { barrier_remove (barrier); } + +/* end of testbed_api_barriers.c */ -- cgit v1.2.3