From 845316cb543af7e4c77709acce4df79f3e0dc162 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 3 Jan 2013 00:43:57 +0000 Subject: implemented the modified consensus api, started implementing p2p protocol for consensus --- src/consensus/consensus_api.c | 481 ++++++++++++++---------------------------- 1 file changed, 154 insertions(+), 327 deletions(-) (limited to 'src/consensus/consensus_api.c') diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 2479c019c..25c76b358 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -33,14 +33,43 @@ #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) -struct ElementAck +/** + * Actions that can be queued. + */ +struct QueuedMessage { - struct ElementAck *next; - struct ElementAck *prev; - int keep; - struct GNUNET_CONSENSUS_Element *element; + /** + * Queued messages are stored in a doubly linked list. + */ + struct QueuedMessage *next; + + /** + * Queued messages are stored in a doubly linked list. + */ + struct QueuedMessage *prev; + + /** + * The actual queued message. + */ + struct GNUNET_MessageHeader *msg; + + /** + * Size of the message in msg. + */ + size_t size; + + /** + * Will be called after transmit, if not NULL + */ + GNUNET_CONSENSUS_InsertDoneCallback idc; + + /** + * Closure for idc + */ + void *idc_cls; }; + /** * Handle for the service. */ @@ -52,14 +81,14 @@ struct GNUNET_CONSENSUS_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Client connected to the consensus service, may be NULL if not connected. */ struct GNUNET_CLIENT_Connection *client; /** * Callback for new elements. Not called for elements added locally. */ - GNUNET_CONSENSUS_NewElementCallback new_element_cb; + GNUNET_CONSENSUS_ElementCallback new_element_cb; /** * Closure for new_element_cb @@ -67,7 +96,7 @@ struct GNUNET_CONSENSUS_Handle void *new_element_cls; /** - * Session identifier for the consensus session. + * The (local) session identifier for the consensus session. */ struct GNUNET_HashCode session_id; @@ -77,9 +106,9 @@ struct GNUNET_CONSENSUS_Handle int num_peers; /** - * Peer identities of peers in the consensus. Optionally includes the local peer. + * Peer identities of peers participating in the consensus, includes the local peer. */ - struct GNUNET_PeerIdentity *peers; + struct GNUNET_PeerIdentity **peers; /** * Currently active transmit request. @@ -91,22 +120,11 @@ struct GNUNET_CONSENSUS_Handle */ int joined; - /** - * Called when the current insertion operation finishes. - * NULL if there is no insert operation active. - */ - GNUNET_CONSENSUS_InsertDoneCallback idc; - /** * Closure for the insert done callback. */ void *idc_cls; - /** - * An element that was requested to be inserted. - */ - struct GNUNET_CONSENSUS_Element *insert_element; - /** * Called when the conclude operation finishes or fails. */ @@ -122,103 +140,92 @@ struct GNUNET_CONSENSUS_Handle */ struct GNUNET_TIME_Absolute conclude_deadline; - struct ElementAck *ack_head; - struct ElementAck *ack_tail; - - /** - * Set to GNUNET_YES if the begin message has been transmitted to the service - */ - int begin_sent; + unsigned int conclude_min_size; - /** - * Set to GNUNET_YES it the begin message should be transmitted to the service - */ - int begin_requested; + struct QueuedMessage *messages_head; + struct QueuedMessage *messages_tail; }; -static size_t -transmit_ack (void *cls, size_t size, void *buf); - -static size_t -transmit_insert (void *cls, size_t size, void *buf); - -static size_t -transmit_conclude (void *cls, size_t size, void *buf); - -static size_t -transmit_begin (void *cls, size_t size, void *buf); - /** - * Call notify_transmit_ready for ack if necessary and possible. + * Schedule transmitting the next message. + * + * @param consensus consensus handle */ static void -ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus) -{ - if ((NULL == consensus->th) && (NULL != consensus->ack_head)) - { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_CONSENSUS_AckMessage), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_ack, consensus); - } -} +schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus); /** - * Call notify_transmit_ready for ack if necessary and possible. + * Function called to notify a client about the connection + * begin ready to queue more data. "buf" will be + * NULL and "size" zero if the connection was closed for + * writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf */ -static void -ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus) +static size_t transmit_queued (void *cls, size_t size, + void *buf) { - if ((NULL == consensus->th) && (NULL != consensus->insert_element)) + struct GNUNET_CONSENSUS_Handle *consensus; + struct QueuedMessage *qmsg; + size_t ret_size; + + printf("transmitting queued\n"); + + consensus = (struct GNUNET_CONSENSUS_Handle *) cls; + qmsg = consensus->messages_head; + GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); + GNUNET_assert (qmsg); + + if (NULL == buf) { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_CONSENSUS_ElementMessage) + - consensus->insert_element->size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_insert, consensus); + if (NULL != qmsg->idc) + { + qmsg->idc (qmsg->idc_cls, GNUNET_YES); + } } -} - -/** - * Call notify_transmit_ready for ack if necessary and possible. - */ -static void -ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus) -{ - if ((NULL == consensus->th) && (NULL != consensus->conclude_cb)) + memcpy (buf, qmsg->msg, qmsg->size); + ret_size = qmsg->size; + if (NULL != qmsg->idc) { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), - GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline), - GNUNET_NO, &transmit_conclude, consensus); + qmsg->idc (qmsg->idc_cls, GNUNET_YES); } + GNUNET_free (qmsg->msg); + GNUNET_free (qmsg); + + schedule_transmit (consensus); + + return ret_size; } /** - * Call notify_transmit_ready for ack if necessary and possible. + * Schedule transmitting the next message. + * + * @param consensus consensus handle */ static void -ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) +schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus) { - if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) && - (GNUNET_NO == consensus->begin_sent)) + if (NULL != consensus->th) + return; + + if (NULL != consensus->messages_head) { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_begin, consensus); + LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); + GNUNET_CLIENT_notify_transmit_ready (consensus->client, consensus->messages_head->size, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_NO, &transmit_queued, consensus); } } + /** * Called when the server has sent is a new element * @@ -226,11 +233,12 @@ ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) * @param msg element message */ static void -handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, +handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_CONSENSUS_ElementMessage *msg) { struct GNUNET_CONSENSUS_Element element; - struct ElementAck *ack; + struct GNUNET_CONSENSUS_AckMessage *ack_msg; + struct QueuedMessage *queued_msg; int ret; element.type = msg->element_type; @@ -238,11 +246,15 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, element.data = &msg[1]; ret = consensus->new_element_cb (consensus->new_element_cls, &element); - ack = GNUNET_malloc (sizeof (struct ElementAck)); - ack->keep = ret; - GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack); - ntr_ack (consensus); + queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage)); + queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1]; + + ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg; + ack_msg->keep = ret; + + GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, + queued_msg); } @@ -254,13 +266,12 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, * @param msg conclude done message */ static void -handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, +handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) { GNUNET_assert (NULL != consensus->conclude_cb); - consensus->conclude_cb(consensus->conclude_cls, - msg->num_peers, - (struct GNUNET_PeerIdentity *) &msg[1]); + consensus->conclude_cb (consensus->conclude_cls, + 0, NULL); consensus->conclude_cb = NULL; } @@ -287,12 +298,6 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_CLIENT_disconnect (consensus->client); consensus->client = NULL; consensus->new_element_cb (NULL, NULL); - if (NULL != consensus->idc) - { - consensus->idc(consensus->idc_cls, GNUNET_NO); - consensus->idc = NULL; - consensus->idc_cls = NULL; - } return; } @@ -305,108 +310,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); break; default: - LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring"); + GNUNET_break (0); } GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, GNUNET_TIME_UNIT_FOREVER_REL); } - - - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_ack (void *cls, size_t size, void *buf) -{ - struct GNUNET_CONSENSUS_AckMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - - consensus = (struct GNUNET_CONSENSUS_Handle *) cls; - - GNUNET_assert (NULL != consensus->ack_head); - - msg = (struct GNUNET_CONSENSUS_AckMessage *) buf; - msg->keep = consensus->ack_head->keep; - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); - msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage)); - - consensus->ack_head = consensus->ack_head->next; - - consensus->th = NULL; - - ntr_insert (consensus); - ntr_ack (consensus); - ntr_conclude (consensus); - - return sizeof (struct GNUNET_CONSENSUS_AckMessage); -} - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_insert (void *cls, size_t size, void *buf) -{ - struct GNUNET_CONSENSUS_ElementMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - GNUNET_CONSENSUS_InsertDoneCallback idc; - int msize; - void *idc_cls; - - GNUNET_assert (NULL != buf); - - consensus = cls; - - GNUNET_assert (NULL != consensus->insert_element); - - consensus->th = NULL; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) + - consensus->insert_element->size; - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - msg->header.size = htons (msize); - memcpy (&msg[1], - consensus->insert_element->data, - consensus->insert_element->size); - - consensus->insert_element = NULL; - - idc = consensus->idc; - consensus->idc = NULL; - idc_cls = consensus->idc_cls; - consensus->idc_cls = NULL; - idc (idc_cls, GNUNET_YES); - - - ntr_ack (consensus); - ntr_insert (consensus); - ntr_conclude (consensus); - - return msize; -} - - /** * Function called to notify a client about the connection * begin ready to queue more data. "buf" will be @@ -427,7 +336,7 @@ transmit_join (void *cls, size_t size, void *buf) GNUNET_assert (NULL != buf); - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); + LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n"); consensus = cls; consensus->th = NULL; @@ -447,9 +356,7 @@ transmit_join (void *cls, size_t size, void *buf) consensus->peers, consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); - ntr_insert (consensus); - ntr_begin (consensus); - ntr_conclude (consensus); + schedule_transmit (consensus); GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, GNUNET_TIME_UNIT_FOREVER_REL); @@ -457,88 +364,11 @@ transmit_join (void *cls, size_t size, void *buf) return msize; } - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_conclude (void *cls, size_t size, void *buf) -{ - struct GNUNET_CONSENSUS_ConcludeMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - consensus = cls; - consensus->th = NULL; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); - msg->header.size = htons (msize); - msg->timeout = - GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); - - ntr_ack (consensus); - - return msize; -} - - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls the consensus handle - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_begin (void *cls, size_t size, void *buf) -{ - struct GNUNET_MessageHeader *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - consensus = cls; - consensus->th = NULL; - - msg = buf; - - msize = sizeof (struct GNUNET_MessageHeader); - - msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); - msg->size = htons (msize); - - ntr_ack (consensus); - ntr_insert (consensus); - ntr_conclude (consensus); - - return msize; -} - - /** * Create a consensus session. * - * @param cfg - * @param num_peers + * @param cfg configuration to use for connecting to the consensus service + * @param num_peers number of peers in the peers array * @param peers array of peers participating in this consensus session * Inclusion of the local peer is optional. * @param session_id session identifier @@ -553,7 +383,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, unsigned int num_peers, const struct GNUNET_PeerIdentity *peers, const struct GNUNET_HashCode *session_id, - GNUNET_CONSENSUS_NewElementCallback new_element_cb, + GNUNET_CONSENSUS_ElementCallback new_element_cb, void *new_element_cls) { struct GNUNET_CONSENSUS_Handle *consensus; @@ -567,17 +397,10 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, consensus->session_id = *session_id; if (0 == num_peers) - { consensus->peers = NULL; - } else if (num_peers > 0) - { - consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); - } - else - { - GNUNET_break (0); - } + consensus->peers = + GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); @@ -615,45 +438,37 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls) { - GNUNET_assert (NULL == consensus->idc); - GNUNET_assert (NULL == consensus->insert_element); - GNUNET_assert (NULL == consensus->conclude_cb); + struct QueuedMessage *qmsg; + struct GNUNET_CONSENSUS_ElementMessage *element_msg; + size_t element_msg_size; - consensus->idc = idc; - consensus->idc_cls = idc_cls; - consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size); + LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size); - if (consensus->joined == 0) - { - return; - } + element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + + element->size); - ntr_insert (consensus); -} + element_msg = GNUNET_malloc (element_msg_size); + element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); + element_msg->header.size = htons (element_msg_size); + memcpy (&element_msg[1], element->data, element->size); + qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); + qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; + qmsg->size = element_msg_size; + qmsg->idc = idc; + qmsg->idc_cls = idc_cls; -/** - * Begin reconciling elements with other peers. - * - * @param consensus handle for the consensus session - */ -void -GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) -{ - GNUNET_assert (NULL == consensus->idc); - GNUNET_assert (NULL == consensus->insert_element); - GNUNET_assert (GNUNET_NO == consensus->begin_requested); - GNUNET_assert (GNUNET_NO == consensus->begin_sent); - - consensus->begin_requested = GNUNET_YES; + GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); - ntr_begin (consensus); + schedule_transmit (consensus); } /** - * We are finished inserting new elements into the consensus; + * We are done with inserting new elements into the consensus; * try to conclude the consensus within a given time window. + * After conclude has been called, no further elements may be + * inserted by the client. * * @param consensus consensus session * @param timeout timeout after which the conculde callback @@ -664,20 +479,32 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) void GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_TIME_Relative timeout, + unsigned int min_group_size_in_consensus, GNUNET_CONSENSUS_ConcludeCallback conclude, void *conclude_cls) { + struct QueuedMessage *qmsg; + struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; + GNUNET_assert (NULL != conclude); GNUNET_assert (NULL == consensus->conclude_cb); consensus->conclude_cls = conclude_cls; consensus->conclude_cb = conclude; - consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); + conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); + conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); + conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); + conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); + conclude_msg->min_group_size = min_group_size_in_consensus; + + qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); + qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; + qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); + + GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); - /* if transmitting the conclude message is not possible right now, transmit_join - * or transmit_ack will handle it */ - ntr_conclude (consensus); + schedule_transmit (consensus); } -- cgit v1.2.3