From 210be82b7cdc6058401e7d5042aa50dd0b750c92 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 11 Apr 2013 10:08:52 +0000 Subject: added consensus log-round simulation, work on consensus service, still problems with dv test case --- src/consensus/Makefile.am | 6 +- src/consensus/consensus-simulation.py | 103 + src/consensus/consensus_api.c | 20 +- src/consensus/consensus_flout.h | 60 - src/consensus/gnunet-consensus-ibf.c | 4 +- src/consensus/gnunet-consensus.c | 3 +- src/consensus/gnunet-service-consensus.c | 3135 ++++++++++++++---------------- src/consensus/ibf.c | 87 +- src/consensus/ibf.h | 53 +- src/consensus/strata_estimator.c | 145 ++ src/consensus/strata_estimator.h | 84 + src/dv/gnunet-service-dv.c | 11 +- src/include/gnunet_consensus_service.h | 4 +- src/include/gnunet_protocols.h | 5 + 14 files changed, 1857 insertions(+), 1863 deletions(-) create mode 100644 src/consensus/consensus-simulation.py delete mode 100644 src/consensus/consensus_flout.h create mode 100644 src/consensus/strata_estimator.c create mode 100644 src/consensus/strata_estimator.h (limited to 'src') diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index e469de057..82af29c87 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am @@ -61,7 +61,8 @@ gnunet_consensus_ibf_LDADD = \ gnunet_service_consensus_SOURCES = \ gnunet-service-consensus.c \ - ibf.c + ibf.c \ + strata_estimator.c gnunet_service_consensus_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/core/libgnunetcore.la \ @@ -71,7 +72,8 @@ gnunet_service_consensus_LDADD = \ gnunet_service_evil_consensus_SOURCES = \ gnunet-service-consensus.c \ - ibf.c + ibf.c \ + strata_estimator.c gnunet_service_evil_consensus_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/core/libgnunetcore.la \ diff --git a/src/consensus/consensus-simulation.py b/src/consensus/consensus-simulation.py new file mode 100644 index 000000000..930dfee62 --- /dev/null +++ b/src/consensus/consensus-simulation.py @@ -0,0 +1,103 @@ +#!/usr/bin/python +# This file is part of GNUnet +# (C) 2013 Christian Grothoff (and other contributing authors) +# +# GNUnet is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation; either version 2, or (at your +# option) any later version. +# +# GNUnet is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNUnet; see the file COPYING. If not, write to the +# Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. + +import argparse +import random +from math import ceil,log,floor + +def bsc(n): + """ count the bits set in n""" + l = n.bit_length() + c = 0 + x = 1 + for _ in range(0, l): + if n & x: + c = c + 1 + x = x << 1 + return c + +def simulate(k, n, verbose): + assert k < n + largest_arc = int(2**ceil(log(n, 2))) / 2 + num_ghosts = (2 * largest_arc) - n + if verbose: + print "we have", num_ghosts, "ghost peers" + # n.b. all peers with idx 1: + print "type of", str(peer_physical) + ":", peer_type + info = new_info + arc = arc << 1; + rounds = rounds + 1 + random.shuffle(peers) + return rounds + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("k", metavar="k", type=int, help="#(bad peers)") + parser.add_argument("n", metavar="n", type=int, help="#(all peers)") + parser.add_argument("r", metavar="r", type=int, help="#(rounds)") + parser.add_argument('--verbose', '-v', action='count') + + args = parser.parse_args() + sum = 0.0; + for n in xrange (0, args.r): + sum += simulate(args.k, args.n, args.verbose) + print sum / args.r; + + diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 25ace3a4d..fd61d3712 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -231,15 +231,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) } } -static void -queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) -{ - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); -} - /** * Called when the server has sent is a new element @@ -252,8 +243,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_CONSENSUS_ElementMessage *msg) { struct GNUNET_CONSENSUS_Element element; - struct GNUNET_CONSENSUS_AckMessage *ack_msg; - int ret; LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); @@ -261,14 +250,7 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); element.data = &msg[1]; - ret = consensus->new_element_cb (consensus->new_element_cls, &element); - - ack_msg = GNUNET_new (struct GNUNET_CONSENSUS_AckMessage); - ack_msg->header.size = htons (sizeof *ack_msg); - ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); - ack_msg->keep = ret; - - queue_message (consensus, &ack_msg->header); + consensus->new_element_cb (consensus->new_element_cls, &element); send_next (consensus); } diff --git a/src/consensus/consensus_flout.h b/src/consensus/consensus_flout.h deleted file mode 100644 index 6c97813a5..000000000 --- a/src/consensus/consensus_flout.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - This file is part of GNUnet - (C) 2012 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. - */ - -/** - * @file consensus/consensus_flout.h - * @brief intentionally misbehave in certain ways for testing - * @author Florian Dold - */ - -#ifndef GNUNET_CONSENSUS_FLOUT_H -#define GNUNET_CONSENSUS_FLOUT_H - -#ifdef __cplusplus -extern "C" -{ -#if 0 /* keep Emacsens' auto-indent happy */ -} -#endif -#endif - -#include "platform.h" -#include "gnunet_common.h" -#include "gnunet_consensus_service.h" - -void -GNUNET_CONSENSUS_flout_disable_peer (struct GNUNET_CONSENSUS_Handle *consensus); - -void -GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_HashCode *element_hash); - -void -GNUNET_CONSENSUS_flout_send_bogos_ibf (struct GNUNET_CONSENSUS_Handle *consensus, ...); - - - -#if 0 /* keep Emacsens' auto-indent happy */ -{ -#endif -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c index 73dc31b56..d431795f1 100644 --- a/src/consensus/gnunet-consensus-ibf.c +++ b/src/consensus/gnunet-consensus-ibf.c @@ -160,8 +160,8 @@ run (void *cls, char *const *args, const char *cfgfile, i++; } - ibf_a = ibf_create (ibf_size, hash_num, 0); - ibf_b = ibf_create (ibf_size, hash_num, 0); + ibf_a = ibf_create (ibf_size, hash_num); + ibf_b = ibf_create (ibf_size, hash_num); printf ("generated sets\n"); diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index 9e9b89446..d8c1b14ee 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c @@ -192,12 +192,11 @@ connect_complete (void *cls, } -static int +static void new_element_cb (void *cls, const struct GNUNET_CONSENSUS_Element *element) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); - return GNUNET_YES; } diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index ebd2d238b..179df0fb0 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -32,23 +32,32 @@ #include "gnunet_consensus_service.h" #include "gnunet_core_service.h" #include "gnunet_stream_lib.h" + #include "consensus_protocol.h" -#include "ibf.h" #include "consensus.h" +#include "ibf.h" +#include "strata_estimator.h" + + +/* + * Log macro that prefixes the local peer and the peer we are in contact with. + */ +#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \ + cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__) /** * Number of IBFs in a strata estimator. */ -#define STRATA_COUNT 32 +#define SE_STRATA_COUNT 32 /** - * Number of buckets per IBF. + * Size of the IBFs in the strata estimator. */ -#define STRATA_IBF_BUCKETS 80 +#define SE_IBF_SIZE 80 /** * hash num parameter for the difference digests and strata estimators */ -#define STRATA_HASH_NUM 3 +#define SE_IBF_HASH_NUM 3 /** * Number of buckets that can be transmitted in one message. @@ -63,72 +72,55 @@ #define MAX_IBF_ORDER (16) /** - * Number exp-rounds. + * Number of exponential rounds, used in the inventory and completion round. */ #define NUM_EXP_ROUNDS (4) /* forward declarations */ -struct ConsensusSession; -struct IncomingSocket; +/* mutual recursion with struct ConsensusSession */ struct ConsensusPeerInformation; -static void -client_send_next (struct ConsensusSession *session); - -static int -get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); - -static void -round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +struct MessageQueue; +/* mutual recursion with round_over */ static void -send_ibf (struct ConsensusPeerInformation *cpi); +subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +/* mutial recursion with transmit_queued */ static void -send_strata_estimator (struct ConsensusPeerInformation *cpi); +client_send_next (struct MessageQueue *mq); +/* mutual recursion with mst_session_callback */ static void -decode (struct ConsensusPeerInformation *cpi); - -static void -write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size); +open_cb (void *cls, struct GNUNET_STREAM_Socket *socket); -static void -subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +static int +mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message); /** - * An element that is waiting to be transmitted to the client. + * Additional information about a consensus element. */ -struct PendingElement +struct ElementInfo { /** - * Pending elements are kept in a DLL. + * The element itself. */ - struct PendingElement *next; - + struct GNUNET_CONSENSUS_Element *element; /** - * Pending elements are kept in a DLL. + * Hash of the element */ - struct PendingElement *prev; - + struct GNUNET_HashCode *element_hash; /** - * The actual element + * Number of other peers that have the element in the inventory. */ - struct GNUNET_CONSENSUS_Element *element; - - /* peer this element is coming from */ - struct ConsensusPeerInformation *cpi; -}; - - -struct ElementList -{ - struct ElementList *next; - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_HashCode *element_hash; + unsigned int inventory_count; + /** + * Bitmap of peers that have this element in their inventory + */ + uint8_t *inventory_bitmap; }; @@ -147,178 +139,93 @@ enum ConsensusRound CONSENSUS_ROUND_EXCHANGE, /** * Exchange which elements each peer has, but not the elements. + * This round uses the all-to-all scheme. */ CONSENSUS_ROUND_INVENTORY, /** - * Collect and distribute missing values. + * Collect and distribute missing values with the exponential scheme. */ - CONSENSUS_ROUND_STOCK, + CONSENSUS_ROUND_COMPLETION, /** - * Consensus concluded. + * Consensus concluded. After timeout and finished communication with client, + * consensus session will be destroyed. */ CONSENSUS_ROUND_FINISH }; +/* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */ /** - * Information about a peer that is in a consensus session. + * State of another peer with respect to the + * current ibf. */ -struct ConsensusPeerInformation -{ - struct GNUNET_PeerIdentity peer_id; - - /** - * Socket for communicating with the peer, either created by the local peer, - * or the remote peer. - */ - struct GNUNET_STREAM_Socket *socket; - - /** - * Message tokenizer, for the data received from this peer via the stream socket. - */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; - - /** - * Do we connect to the peer, or does the peer connect to us? - * Only valid for all-to-all phases - */ - int is_outgoing; - - /** - * Did we receive/send a consensus hello? - */ - int hello; - - /** - * Handle for currently active read - */ - struct GNUNET_STREAM_ReadHandle *rh; - - /** - * Handle for currently active read - */ - struct GNUNET_STREAM_WriteHandle *wh; - - enum { - /* beginning of round */ - IBF_STATE_NONE=0, - /* we currently receive an ibf */ - IBF_STATE_RECEIVING, - /* we currently transmit an ibf */ - IBF_STATE_TRANSMITTING, - /* we decode a received ibf */ - IBF_STATE_DECODING, - /* wait for elements and element requests */ - IBF_STATE_ANTICIPATE_DIFF - } ibf_state ; - - /** - * What is the order (=log2 size) of the ibf - * we're currently dealing with? - * Interpretation depends on ibf_state. - */ - int ibf_order; - - /** - * The current IBF for this peer, - * purpose dependent on ibf_state - */ - struct InvertibleBloomFilter *ibf; - - /** - * How many buckets have we transmitted/received? - * Interpretatin depends on ibf_state - */ - int ibf_bucket_counter; - - /** - * Strata estimator of the peer, NULL if our peer - * initiated the reconciliation. - */ - struct StrataEstimator *se; - - /** - * Element keys that this peer misses, but we have them. - */ - struct GNUNET_CONTAINER_MultiHashMap *requested_keys; - - /** - * Element keys that this peer has, but we miss. - */ - struct GNUNET_CONTAINER_MultiHashMap *reported_keys; - +enum ConsensusIBFState { /** - * Back-reference to the consensus session, - * to that ConsensusPeerInformation can be used as a closure + * There is nothing going on with the IBF. */ - struct ConsensusSession *session; - + IBF_STATE_NONE=0, /** - * Messages queued for the current round. + * We currently receive an ibf. */ - struct QueuedMessage *messages_head; - + IBF_STATE_RECEIVING, + /* + * we decode a received ibf + */ + IBF_STATE_DECODING, /** - * Messages queued for the current round. + * wait for elements and element requests */ - struct QueuedMessage *messages_tail; + IBF_STATE_ANTICIPATE_DIFF +}; - /** - * True if we are actually replaying the strata message, - * e.g. currently handling the premature_strata_message. - */ - int replaying_strata_message; - /** - * A strata message that is not actually for the current round, - * used in the exp-scheme. - */ - struct StrataMessage *premature_strata_message; +typedef void (*AddCallback) (struct MessageQueue *mq); +typedef void (*MessageSentCallback) (void *cls); - /** - * We have finishes the exp-subround with the peer. - */ - int exp_subround_finished; - int inventory_synced; +/** + * Collection of the state necessary to read and write gnunet messages + * to a stream socket. Should be used as closure for stream_data_processor. + */ +struct MessageStreamState +{ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + struct MessageQueue *mq; + void *mst_cls; + struct GNUNET_STREAM_Socket *socket; + struct GNUNET_STREAM_ReadHandle *rh; + struct GNUNET_STREAM_WriteHandle *wh; +}; - /** - * Round this peer seems to be in, according to the last SE we got. - * Necessary to store this, as we sometimes need to respond to a request from an - * older round, while we are already in the next round. - */ - enum ConsensusRound apparent_round; +struct ServerClientSocketState +{ + struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVER_TransmitHandle* th; }; -typedef void (*QueuedMessageCallback) (void *msg); /** - * A doubly linked list of messages. + * Generic message queue, for queueing outgoing messages. */ -struct QueuedMessage +struct MessageQueue { - struct GNUNET_MessageHeader *msg; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *next; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *prev; - - QueuedMessageCallback cb; - - void *cls; + void *state; + AddCallback add_cb; + struct PendingMessage *pending_head; + struct PendingMessage *pending_tail; + struct PendingMessage *current_pm; }; -struct StrataEstimator +struct PendingMessage { - struct InvertibleBloomFilter **strata; + struct GNUNET_MessageHeader *msg; + struct MessageQueue *parent_queue; + struct PendingMessage *next; + struct PendingMessage *prev; + MessageSentCallback sent_cb; + void *sent_cb_cls; }; @@ -351,38 +258,26 @@ struct ConsensusSession struct GNUNET_HashCode global_id; /** - * Local client in this consensus session. - * There is only one client per consensus session. - */ - struct GNUNET_SERVER_Client *client; - - /** - * Elements in the consensus set of this session, - * all of them either have been sent by or approved by the client. - * Contains ElementList. - * Used as a unique-key hashmap. - */ - struct GNUNET_CONTAINER_MultiHashMap *values; - - /** - * Elements that have not been approved (or rejected) by the client yet. + * The server's client and associated local state */ - struct PendingElement *client_approval_head; + struct ServerClientSocketState scss; /** - * Elements that have not been approved (or rejected) by the client yet. + * Queued messages to the client. */ - struct PendingElement *client_approval_tail; + struct MessageQueue *client_mq; /** - * Messages to be sent to the local client that owns this session + * IBF_Key -> 2^(HashCode*) + * FIXME: + * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *. */ - struct QueuedMessage *client_messages_head; + struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map; /** - * Messages to be sent to the local client that owns this session + * Maps HashCodes to ElementInfos */ - struct QueuedMessage *client_messages_tail; + struct GNUNET_CONTAINER_MultiHashMap *values; /** * Currently active transmit handle for sending to the client @@ -411,10 +306,15 @@ struct ConsensusSession */ struct ConsensusPeerInformation *info; + /** + * GNUNET_YES if the client has called conclude. + * */ + int conclude; + /** * Index of the local peer in the peers array */ - int local_peer_idx; + unsigned int local_peer_idx; /** * Strata estimator, computed online @@ -431,16 +331,16 @@ struct ConsensusSession */ enum ConsensusRound current_round; - int exp_round; - - int exp_subround; - /** * Permutation of peers for the current round, * maps logical index (for current round) to physical index (location in info array) */ int *shuffle; + int exp_round; + + int exp_subround; + /** * The partner for the current exp-round */ @@ -454,85 +354,181 @@ struct ConsensusSession /** - * Sockets from other peers who want to communicate with us. - * It may not be known yet which consensus session they belong to. - * Also, the session might not exist yet locally. + * Information about a peer that is in a consensus session. */ -struct IncomingSocket +struct ConsensusPeerInformation { /** - * Incoming sockets are kept in a double linked list. + * Peer identitty of the peer in the consensus session */ - struct IncomingSocket *next; + struct GNUNET_PeerIdentity peer_id; /** - * Incoming sockets are kept in a double linked list. + * Do we connect to the peer, or does the peer connect to us? + * Only valid for all-to-all phases */ - struct IncomingSocket *prev; + int is_outgoing; /** - * The actual socket. + * Did we receive/send a consensus hello? */ - struct GNUNET_STREAM_Socket *socket; + int hello; + + /* + * FIXME + */ + struct MessageStreamState mss; /** - * Handle for currently active read + * Current state */ - struct GNUNET_STREAM_ReadHandle *rh; + enum ConsensusIBFState ibf_state; /** - * Peer that connected to us with the socket. + * What is the order (=log2 size) of the ibf + * we're currently dealing with? + * Interpretation depends on ibf_state. */ - struct GNUNET_PeerIdentity peer_id; + int ibf_order; /** - * Message stream tokenizer for this socket. + * The current IBF for this peer, + * purpose dependent on ibf_state */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; + struct InvertibleBloomFilter *ibf; /** - * Peer-in-session this socket belongs to, once known, otherwise NULL. + * How many buckets have we transmitted/received? + * Interpretatin depends on ibf_state */ - struct ConsensusPeerInformation *cpi; + int ibf_bucket_counter; /** - * Set to the global session id, if the peer sent us a hello-message, - * but the session does not exist yet. + * Strata estimator of the peer, NULL if our peer + * initiated the reconciliation. */ - struct GNUNET_HashCode *requested_gid; -}; + struct StrataEstimator *se; + /** + * Back-reference to the consensus session, + * to that ConsensusPeerInformation can be used as a closure + */ + struct ConsensusSession *session; -static struct IncomingSocket *incoming_sockets_head; -static struct IncomingSocket *incoming_sockets_tail; + /** + * True if we are actually replaying the strata message, + * e.g. currently handling the premature_strata_message. + */ + int replaying_strata_message; -/** - * Linked list of sesstions this peer participates in. - */ -static struct ConsensusSession *sessions_head; + /** + * A strata message that is not actually for the current round, + * used in the exp-scheme. + */ + struct StrataMessage *premature_strata_message; -/** - * Linked list of sesstions this peer participates in. - */ -static struct ConsensusSession *sessions_tail; + /** + * We have finishes the exp-subround with the peer. + */ + int exp_subround_finished; -/** - * Configuration of the consensus service. - */ -static const struct GNUNET_CONFIGURATION_Handle *cfg; + /** + * GNUNET_YES if we synced inventory with this peer; + * GNUNET_NO otherwise. + */ + int inventory_synced; -/** - * Handle to the server for this service. - */ -static struct GNUNET_SERVER_Handle *srv; + /** + * Round this peer seems to be in, according to the last SE we got. + * Necessary to store this, as we sometimes need to respond to a request from an + * older round, while we are already in the next round. + */ + enum ConsensusRound apparent_round; +}; -/** - * Peer that runs this service. - */ -static struct GNUNET_PeerIdentity *my_peer; /** - * Handle to the core service. Only used during service startup, will be NULL after that. + * Sockets from other peers who want to communicate with us. + * It may not be known yet which consensus session they belong to, we have to wait for the + * peer's hello. + * Also, the session might not exist yet locally, we have to wait for a local client to connect. + */ +struct IncomingSocket +{ + /** + * Incoming sockets are kept in a double linked list. + */ + struct IncomingSocket *next; + + /** + * Incoming sockets are kept in a double linked list. + */ + struct IncomingSocket *prev; + + /** + * Peer that connected to us with the socket. + */ + struct GNUNET_PeerIdentity peer_id; + + /** + * Peer-in-session this socket belongs to, once known, otherwise NULL. + */ + struct ConsensusPeerInformation *cpi; + + /** + * Set to the global session id, if the peer sent us a hello-message, + * but the session does not exist yet. + */ + struct GNUNET_HashCode *requested_gid; + + /* + * Timeout, will disconnect the socket if not yet in a session. + * FIXME: implement + */ + GNUNET_SCHEDULER_TaskIdentifier timeout; + + /* FIXME */ + struct MessageStreamState mss; +}; + + +/** + * Linked list of incoming sockets. + */ +static struct IncomingSocket *incoming_sockets_head; + +/** + * Linked list of incoming sockets. + */ +static struct IncomingSocket *incoming_sockets_tail; + +/** + * Linked list of sessions this peer participates in. + */ +static struct ConsensusSession *sessions_head; + +/** + * Linked list of sessions this peer participates in. + */ +static struct ConsensusSession *sessions_tail; + +/** + * Configuration of the consensus service. + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Handle to the server for this service. + */ +static struct GNUNET_SERVER_Handle *srv; + +/** + * Peer that runs this service. + */ +static struct GNUNET_PeerIdentity *my_peer; + +/** + * Handle to the core service. Only used during service startup, will be NULL after that. */ static struct GNUNET_CORE_Handle *core; @@ -543,151 +539,159 @@ static struct GNUNET_STREAM_ListenSocket *listener; /** - * Queue a message to be sent to the inhabiting client of a session. + * Transmit a queued message to the session's client. * - * @param session session - * @param msg message we want to queue + * @param cls consensus session + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf */ -static void -queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) +static size_t +transmit_queued (void *cls, size_t size, + void *buf) { - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); + struct MessageQueue *mq = cls; + struct PendingMessage *pm = mq->pending_head; + struct ServerClientSocketState *state = mq->state; + size_t msg_size; + + GNUNET_assert (NULL != pm); + GNUNET_assert (NULL != buf); + msg_size = ntohs (pm->msg->size); + GNUNET_assert (size >= msg_size); + memcpy (buf, pm->msg, msg_size); + GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); + state->th = NULL; + client_send_next (cls); + GNUNET_free (pm); + return msg_size; } -/** - * Queue a message to be sent to another peer - * - * @param cpi peer - * @param msg message we want to queue - * @param cb callback, called when the message is given to strem - * @param cls closure for cb - */ + static void -queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) -{ - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - qm->cls = cls; - qm->cb = cb; - GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm); - if (cpi->wh == NULL) - write_queued (cpi, GNUNET_STREAM_OK, 0); +client_send_next (struct MessageQueue *mq) +{ + struct ServerClientSocketState *state = mq->state; + int msize; + + GNUNET_assert (NULL != state); + + if ( (NULL != state->th) || + (NULL == mq->pending_head) ) + return; + msize = ntohs (mq->pending_head->msg->size); + state->th = + GNUNET_SERVER_notify_transmit_ready (state->client, msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_queued, mq); +} + + +struct MessageQueue * +create_message_queue_for_server_client (struct ServerClientSocketState *scss) +{ + struct MessageQueue *mq; + mq = GNUNET_new (struct MessageQueue); + mq->add_cb = client_send_next; + mq->state = scss; + return mq; } /** - * Queue a message to be sent to another peer + * Functions of this signature are called whenever writing operations + * on a stream are executed * - * @param cpi peer - * @param msg message we want to queue + * @param cls the closure from GNUNET_STREAM_write + * @param status the status of the stream at the time this function is called; + * GNUNET_STREAM_OK if writing to stream was completed successfully; + * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully + * (this doesn't mean that the data is never sent, the receiver may + * have read the data but its ACKs may have been lost); + * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the + * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot + * be processed. + * @param size the number of bytes written */ -static void -queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg) +static void +write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - queue_peer_message_with_cls (cpi, msg, NULL, NULL); + struct MessageQueue *mq = cls; + struct MessageStreamState *mss = mq->state; + struct PendingMessage *pm; + + GNUNET_assert (GNUNET_STREAM_OK == status); + + /* call cb for message we finished sending */ + pm = mq->current_pm; + if (NULL != pm) + { + if (NULL != pm->sent_cb) + pm->sent_cb (pm->sent_cb_cls); + GNUNET_free (pm); + } + + mss->wh = NULL; + + pm = mq->pending_head; + mq->current_pm = pm; + if (NULL == pm) + return; + GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); + mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls); + GNUNET_assert (NULL != mss->wh); } -/* static void -clear_peer_messages (struct ConsensusPeerInformation *cpi) +stream_socket_add_cb (struct MessageQueue *mq) { - cpi->messages_head = NULL; - cpi->messages_tail = NULL; + if (NULL != mq->current_pm) + return; + write_queued (mq, GNUNET_STREAM_OK, 0); } -*/ -/** - * Estimate set difference with two strata estimators, - * i.e. arrays of IBFs. - * Does not not modify its arguments. - * - * @param se1 first strata estimator - * @param se2 second strata estimator - * @return the estimated difference - */ -static int -estimate_difference (const struct StrataEstimator *se1, - const struct StrataEstimator *se2) +struct MessageQueue * +create_message_queue_for_stream_socket (struct MessageStreamState *mss) { - int i; - int count; - count = 0; - for (i = STRATA_COUNT - 1; i >= 0; i--) - { - struct InvertibleBloomFilter *diff; - /* number of keys decoded from the ibf */ - int ibf_count; - int more; - ibf_count = 0; - /* FIXME: implement this without always allocating new IBFs */ - diff = ibf_dup (se1->strata[i]); - ibf_subtract (diff, se2->strata[i]); - for (;;) - { - more = ibf_decode (diff, NULL, NULL); - if (GNUNET_NO == more) - { - count += ibf_count; - break; - } - if (GNUNET_SYSERR == more) - { - ibf_destroy (diff); - return count * (1 << (i + 1)); - } - ibf_count++; - } - ibf_destroy (diff); - } - return count; + struct MessageQueue *mq; + mq = GNUNET_new (struct MessageQueue); + mq->state = mss; + mq->add_cb = stream_socket_add_cb; + return mq; +} + + +struct PendingMessage * +new_pending_message (uint16_t size, uint16_t type) +{ + struct PendingMessage *pm; + pm = GNUNET_malloc (sizeof *pm + size); + pm->msg = (void *) &pm[1]; + pm->msg->size = htons (size); + pm->msg->type = htons (type); + return pm; } /** - * Called when receiving data from a peer that is member of - * an inhabited consensus session. + * Queue a message in a message queue. * - * @param cls the closure from GNUNET_STREAM_read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read; will be 0 on timeout - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). + * @param queue the message queue + * @param pending message, message with additional information */ -static size_t -session_stream_data_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) +void +message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg) { - struct ConsensusPeerInformation *cpi; - int ret; - - GNUNET_assert (GNUNET_STREAM_OK == status); - cpi = cls; - GNUNET_assert (NULL != cpi->mst); - ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); - if (GNUNET_SYSERR == ret) - { - /* FIXME: handle this correctly */ - GNUNET_assert (0); - } - /* read again */ - cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, - &session_stream_data_processor, cpi); - /* we always read all data */ - return size; + GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg); + queue->add_cb (queue); } + /** - * Called when we receive data from a peer that is not member of - * a session yet, or the session is not yet inhabited. + * Called when we receive data from a peer via stream. * * @param cls the closure from GNUNET_STREAM_read * @param status the status of the stream at the time this function is called @@ -697,62 +701,66 @@ session_stream_data_processor (void *cls, * given to the next time the read processor is called). */ static size_t -incoming_stream_data_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) +stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size) { - struct IncomingSocket *incoming; + struct MessageStreamState *mss = cls; int ret; - GNUNET_assert (GNUNET_STREAM_OK == status); - incoming = cls; - ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); + mss->rh = NULL; + + if (GNUNET_STREAM_OK != status) + { + /* FIXME: handle this correctly */ + GNUNET_break (0); + return 0; + } + GNUNET_assert (NULL != mss->mst); + ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES); if (GNUNET_SYSERR == ret) { /* FIXME: handle this correctly */ - GNUNET_assert (0); + GNUNET_break (0); + return 0; } /* read again */ - incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, - &incoming_stream_data_processor, incoming); + mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss); /* we always read all data */ return size; } +/** + * Send element or element report to the peer specified in cpi. + * + * @param cpi peer to send the elements to + * @param head head of the element list + */ static void -send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head) +send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e) { - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_MessageHeader *element_msg; - size_t msize; + struct PendingMessage *pm; - while (NULL != head) + switch (cpi->apparent_round) { - element = head->element; - msize = sizeof (struct GNUNET_MessageHeader) + element->size; - element_msg = GNUNET_malloc (msize); - element_msg->size = htons (msize); - switch (cpi->apparent_round) - { - case CONSENSUS_ROUND_STOCK: - case CONSENSUS_ROUND_EXCHANGE: - element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); - break; - case CONSENSUS_ROUND_INVENTORY: - element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); - break; - default: - GNUNET_break (0); - } - GNUNET_assert (NULL != element->data); - memcpy (&element_msg[1], element->data, element->size); - queue_peer_message (cpi, element_msg); - head = head->next; + case CONSENSUS_ROUND_COMPLETION: + case CONSENSUS_ROUND_EXCHANGE: + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); + memcpy (&pm->msg[1], e->element->data, e->element->size); + message_queue_add (cpi->mss.mq, pm); + break; + case CONSENSUS_ROUND_INVENTORY: + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); + memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode)); + message_queue_add (cpi->mss.mq, pm); + break; + default: + GNUNET_break (0); } } + /** * Iterator to insert values into an ibf. * @@ -768,12 +776,10 @@ ibf_values_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) { - struct ConsensusPeerInformation *cpi; - struct ElementList *head; - struct IBF_Key ibf_key; - cpi = cls; - head = value; - ibf_key = ibf_key_from_hashcode (head->element_hash); + struct ConsensusPeerInformation *cpi = cls; + struct ElementInfo *e = value; + struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash); + GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); return GNUNET_YES; @@ -788,11 +794,10 @@ ibf_values_iterator (void *cls, static void prepare_ibf (struct ConsensusPeerInformation *cpi) { - if (NULL == cpi->session->ibfs[cpi->ibf_order]) - { - cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); - GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); - } + if (NULL != cpi->session->ibfs[cpi->ibf_order]) + return; + cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); + GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); } @@ -816,15 +821,18 @@ exp_subround_finished (const struct ConsensusSession *session) { int not_finished; not_finished = 0; - if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO)) - not_finished++; - if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO)) - not_finished++; + if ( (NULL != session->partner_outgoing) && + (GNUNET_NO == session->partner_outgoing->exp_subround_finished) ) + not_finished++; + if ( (NULL != session->partner_incoming) && + (GNUNET_NO == session->partner_incoming->exp_subround_finished) ) + not_finished++; if (0 == not_finished) return GNUNET_YES; return GNUNET_NO; } + static int inventory_round_finished (struct ConsensusSession *session) { @@ -840,153 +848,170 @@ inventory_round_finished (struct ConsensusSession *session) } - static void -fin_sent_cb (void *cls) +clear_message_stream_state (struct MessageStreamState *mss) { - struct ConsensusPeerInformation *cpi; - cpi = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); - switch (cpi->session->current_round) + if (NULL != mss->mst) { - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_STOCK: - if (cpi->session->current_round != cpi->apparent_round) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); - break; - } - cpi->exp_subround_finished = GNUNET_YES; - /* the subround is only really over if *both* partners are done */ - if (GNUNET_YES == exp_subround_finished (cpi->session)) - subround_over (cpi->session, NULL); - else - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); - break; - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) - round_over (cpi->session, NULL); - /* FIXME: maybe go to next round */ - break; - default: - GNUNET_break (0); + GNUNET_SERVER_mst_destroy (mss->mst); + mss->mst = NULL; + } + if (NULL != mss->rh) + { + GNUNET_STREAM_read_cancel (mss->rh); + mss->rh = NULL; + } + if (NULL != mss->wh) + { + GNUNET_STREAM_write_cancel (mss->wh); + mss->wh = NULL; + } + if (NULL != mss->socket) + { + GNUNET_STREAM_close (mss->socket); + mss->socket = NULL; + } + if (NULL != mss->mq) + { + GNUNET_free (mss->mq); + mss->mq = NULL; } } /** - * Gets called when the other peer wants us to inform that - * it has decoded our ibf and sent us all elements / requests + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. */ static int -handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) +destroy_element_info_iter (void *cls, + const struct GNUNET_HashCode * key, + void *value) { - struct ConsensusRoundMessage *fin_msg; - - switch (cpi->session->current_round) - { - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - case CONSENSUS_ROUND_STOCK: - case CONSENSUS_ROUND_EXCHANGE: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - fin_msg = GNUNET_malloc (sizeof *fin_msg); - fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); - fin_msg->header.size = htons (sizeof *fin_msg); - fin_msg->round = cpi->apparent_round; - /* the subround os over once we kicked off sending the fin msg */ - /* FIXME: assert we are talking to the right peer! */ - queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi); - /* FIXME: mark peer as synced */ - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); - break; - } + struct ElementInfo *ei = value; + GNUNET_free (ei->element); + GNUNET_free (ei->element_hash); + GNUNET_free (ei); return GNUNET_YES; } /** - * The other peer wants us to inform that he sent us all the elements we requested. + * Destroy a session, free all resources associated with it. + * + * @param session the session to destroy */ -static int -handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) +static void +destroy_session (struct ConsensusSession *session) { - struct ConsensusRoundMessage *round_msg; - round_msg = (struct ConsensusRoundMessage *) msg; - /* FIXME: only call subround_over if round is the current one! */ - switch (cpi->session->current_round) + int i; + + GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); + GNUNET_SERVER_client_drop (session->scss.client); + session->scss.client = NULL; + if (NULL != session->client_mq) { - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_STOCK: - if (cpi->session->current_round != round_msg->round) + GNUNET_free (session->client_mq); + session->client_mq = NULL; + } + if (NULL != session->shuffle) + { + GNUNET_free (session->shuffle); + session->shuffle = NULL; + } + if (NULL != session->se) + { + strata_estimator_destroy (session->se); + session->se = NULL; + } + if (NULL != session->info) + { + for (i = 0; i < session->num_peers; i++) + { + struct ConsensusPeerInformation *cpi; + cpi = &session->info[i]; + clear_message_stream_state (&cpi->mss); + if (NULL != cpi->se) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - cpi->ibf_state = IBF_STATE_NONE; - cpi->ibf_bucket_counter = 0; - break; + strata_estimator_destroy (cpi->se); + cpi->se = NULL; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - cpi->exp_subround_finished = GNUNET_YES; - if (GNUNET_YES == exp_subround_finished (cpi->session)) - subround_over (cpi->session, NULL); - else - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); - break; - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - if (inventory_round_finished (cpi->session)) - round_over (cpi->session, NULL); - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); - break; + if (NULL != cpi->ibf) + { + ibf_destroy (cpi->ibf); + cpi->ibf = NULL; + } + } + GNUNET_free (session->info); + session->info = NULL; + } + if (NULL != session->ibfs) + { + for (i = 0; i <= MAX_IBF_ORDER; i++) + { + if (NULL != session->ibfs[i]) + { + ibf_destroy (session->ibfs[i]); + session->ibfs[i] = NULL; + } + } + GNUNET_free (session->ibfs); + session->ibfs = NULL; + } + if (NULL != session->values) + { + GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL); + GNUNET_CONTAINER_multihashmap_destroy (session->values); + session->values = NULL; } - return GNUNET_YES; -} - - -static struct StrataEstimator * -strata_estimator_create () -{ - struct StrataEstimator *se; - int i; - - /* fixme: allocate everything in one chunk */ - - se = GNUNET_malloc (sizeof (struct StrataEstimator)); - se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT); - for (i = 0; i < STRATA_COUNT; i++) - se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); - return se; + if (NULL != session->ibf_key_map) + { + GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map); + session->ibf_key_map = NULL; + } + GNUNET_free (session); } + static void -strata_estimator_destroy (struct StrataEstimator *se) +send_client_conclude_done (struct ConsensusSession *session) { - int i; - for (i = 0; i < STRATA_COUNT; i++) - ibf_destroy (se->strata[i]); - GNUNET_free (se->strata); - GNUNET_free (se); + struct PendingMessage *pm; + + /* check if client is even there anymore */ + if (NULL == session->scss.client) + return; + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader), + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); + message_queue_add (session->client_mq, pm); } +/** + * Check if a strata message is for the current round or not + * + * @param session session we are in + * @param strata_msg the strata message to check + * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise + */ static int is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) { switch (strata_msg->round) { - case CONSENSUS_ROUND_STOCK: + case CONSENSUS_ROUND_COMPLETION: case CONSENSUS_ROUND_EXCHANGE: /* here, we also have to compare subrounds */ if ( (strata_msg->round != session->current_round) || (strata_msg->exp_round != session->exp_round) || - (strata_msg->exp_subround != session->exp_subround)) + (strata_msg->exp_subround != session->exp_subround) ) return GNUNET_YES; break; default: @@ -998,6 +1023,72 @@ is_premature_strata_message (const struct ConsensusSession *session, const struc } +/** + * Send a strata estimator. + * + * @param cpi the peer + */ +static void +send_strata_estimator (struct ConsensusPeerInformation *cpi) +{ + struct PendingMessage *pm; + struct StrataMessage *strata_msg; + + /* FIXME: why is this correct? */ + cpi->apparent_round = cpi->session->current_round; + cpi->ibf_state = IBF_STATE_NONE; + cpi->ibf_bucket_counter = 0; + + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round); + + pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE), + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); + strata_msg = (struct StrataMessage *) pm->msg; + strata_msg->round = cpi->session->current_round; + strata_msg->exp_round = cpi->session->exp_round; + strata_msg->exp_subround = cpi->session->exp_subround; + strata_estimator_write (cpi->session->se, &strata_msg[1]); + message_queue_add (cpi->mss.mq, pm); +} + + +/** + * Send an IBF of the order specified in cpi. + * + * @param cpi the peer + */ +static void +send_ibf (struct ConsensusPeerInformation *cpi) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", + cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + + cpi->ibf_bucket_counter = 0; + while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) + { + unsigned int num_buckets; + struct PendingMessage *pm; + struct DifferenceDigest *digest; + + num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; + /* limit to maximum */ + if (num_buckets > BUCKETS_PER_MESSAGE) + num_buckets = BUCKETS_PER_MESSAGE; + + pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE), + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); + digest = (struct DifferenceDigest *) pm->msg; + digest->order = cpi->ibf_order; + digest->round = cpi->apparent_round; + ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]); + cpi->ibf_bucket_counter += num_buckets; + message_queue_add (cpi->mss.mq, pm); + } + cpi->ibf_bucket_counter = 0; + cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; +} + + /** * Called when a peer sends us its strata estimator. * In response, we sent out IBF of appropriate size back. @@ -1008,12 +1099,10 @@ is_premature_strata_message (const struct ConsensusSession *session, const struc static int handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) { - int i; // unsigned? unsigned int diff; - void *buf; - size_t size; - if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY)) + if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) && + (strata_msg->round == CONSENSUS_ROUND_INVENTORY) ) { /* we still have to handle this request appropriately */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", @@ -1023,28 +1112,26 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess { if (GNUNET_NO == cpi->replaying_strata_message) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround); - cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n", + strata_msg->exp_round, strata_msg->exp_subround); + cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header); } return GNUNET_YES; } if (NULL == cpi->se) - cpi->se = strata_estimator_create (); + cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); cpi->apparent_round = strata_msg->round; - size = ntohs (strata_msg->header.size); - buf = (void *) &strata_msg[1]; // FIXME: do NOT cast away 'const'! - for (i = 0; i < STRATA_COUNT; i++) + if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) { - int res; - res = ibf_read (&buf, &size, cpi->se->strata[i]); - GNUNET_assert (GNUNET_OK == res); + LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n"); + return GNUNET_NO; } - - diff = estimate_difference (cpi->session->se, cpi->se); + strata_estimator_read (&strata_msg[1], cpi->se); + GNUNET_assert (NULL != cpi->session->se); + diff = strata_estimator_difference (cpi->session->se, cpi->se); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); @@ -1053,10 +1140,10 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess { case CONSENSUS_ROUND_EXCHANGE: case CONSENSUS_ROUND_INVENTORY: - case CONSENSUS_ROUND_STOCK: + case CONSENSUS_ROUND_COMPLETION: /* send IBF of the right size */ cpi->ibf_order = 0; - while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) ) + while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) ) cpi->ibf_order++; if (cpi->ibf_order > MAX_IBF_ORDER) cpi->ibf_order = MAX_IBF_ORDER; @@ -1066,7 +1153,6 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess if (NULL != cpi->ibf) ibf_destroy (cpi->ibf); cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); - cpi->ibf_state = IBF_STATE_TRANSMITTING; cpi->ibf_bucket_counter = 0; send_ibf (cpi); break; @@ -1079,11 +1165,104 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess } + +static int +send_elements_iterator (void *cls, + const struct GNUNET_HashCode * key, + void *value) +{ + struct ConsensusPeerInformation *cpi = cls; + struct ElementInfo *ei; + ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value); + if (NULL == ei) + { + LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n", + GNUNET_h2s((struct GNUNET_HashCode *) value)); + return GNUNET_YES; + } + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n"); + send_element_or_report (cpi, ei); + return GNUNET_YES; +} + + +/** + * Decode the current diff ibf, and send elements/requests/reports/ + * + * @param cpi partner peer + */ +static void +decode (struct ConsensusPeerInformation *cpi) +{ + struct IBF_Key key; + int side; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + + while (1) + { + int res; + + res = ibf_decode (cpi->ibf, &side, &key); + if (GNUNET_SYSERR == res) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); + /* decoding failed, we tell the other peer by sending our ibf with a larger order */ + cpi->ibf_order++; + prepare_ibf (cpi); + cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); + cpi->ibf_bucket_counter = 0; + send_ibf (cpi); + return; + } + if (GNUNET_NO == res) + { + struct PendingMessage *pm; + struct ConsensusRoundMessage *rmsg; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); + + pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); + rmsg = (struct ConsensusRoundMessage *) pm->msg; + rmsg->round = cpi->apparent_round; + message_queue_add (cpi->mss.mq, pm); + return; + } + if (-1 == side) + { + struct GNUNET_HashCode hashcode; + /* we have the element(s), send it to the other peer */ + ibf_hashcode_from_key (key, &hashcode); + GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); + } + else + { + struct PendingMessage *pm; + uint16_t type; + + switch (cpi->apparent_round) + { + case CONSENSUS_ROUND_COMPLETION: + /* FIXME: check if we really want to request the element */ + case CONSENSUS_ROUND_EXCHANGE: + case CONSENSUS_ROUND_INVENTORY: + type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST; + break; + default: + GNUNET_assert (0); + } + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key), + type); + *(struct IBF_Key *) &pm->msg[1] = key; + message_queue_add (cpi->mss.mq, pm); + } + } +} + + static int handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) { int num_buckets; - void *buf; /* FIXME: find out if we're still expecting the same ibf! */ @@ -1128,13 +1307,10 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig return GNUNET_YES; } - if (NULL == cpi->ibf) - cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); - - buf = (void *) &digest[1]; // FIXME: digest is supposed to be READ ONLY! - ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf); + cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); + ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf); cpi->ibf_bucket_counter += num_buckets; if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) @@ -1149,20 +1325,54 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig } +/** + * Insert an element into the consensus set of the specified session. + * The element will not be copied, and freed when destroying the session. + * + * @param session session for new element + * @param element element to insert + */ +static void +insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) +{ + struct GNUNET_HashCode hash; + struct ElementInfo *e; + struct IBF_Key ibf_key; + int i; + + e = GNUNET_new (struct ElementInfo); + e->element = element; + e->element_hash = GNUNET_new (struct GNUNET_HashCode); + GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash); + ibf_key = ibf_key_from_hashcode (e->element_hash); + ibf_hashcode_from_key (ibf_key, &hash); + strata_estimator_insert (session->se, &hash); + GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + for (i = 0; i <= MAX_IBF_ORDER; i++) + { + if (NULL == session->ibfs[i]) + continue; + ibf_insert (session->ibfs[i], ibf_key); + } +} + + /** * Handle an element that another peer sent us */ static int handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) { - struct PendingElement *pending_element; struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; size_t size; switch (cpi->session->current_round) { - case CONSENSUS_ROUND_STOCK: + case CONSENSUS_ROUND_COMPLETION: /* FIXME: check if we really expect the element */ case CONSENSUS_ROUND_EXCHANGE: break; @@ -1178,21 +1388,10 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me memcpy (&element[1], &element_msg[1], size); element->data = &element[1]; - pending_element = GNUNET_malloc (sizeof *pending_element); - pending_element->element = element; - GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element); + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n"); - client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); - client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); - client_element_msg->header.size = htons (size + sizeof *client_element_msg); - memcpy (&client_element_msg[1], &element[1], size); + insert_element (cpi->session, element); - queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size); - - client_send_next (cpi->session); - return GNUNET_YES; } @@ -1213,20 +1412,36 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E /* element requests are allowed in every round */ num = ntohs (msg->header.size) / sizeof (struct IBF_Key); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num); ibf_key = (struct IBF_Key *) &msg[1]; while (num--) { - struct ElementList *head; ibf_hashcode_from_key (*ibf_key, &hashcode); - head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); - send_elements (cpi, head); + GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); ibf_key++; } return GNUNET_YES; } +static int +is_peer_connected (struct ConsensusPeerInformation *cpi) +{ + if (NULL == cpi->mss.socket) + return GNUNET_NO; + return GNUNET_YES; +} + + +static void +ensure_peer_connected (struct ConsensusPeerInformation *cpi) +{ + if (NULL != cpi->mss.socket) + return; + cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, + open_cb, cpi, GNUNET_STREAM_OPTION_END); +} + + /** * If necessary, send a message to the peer, depending on the current * round. @@ -1234,17 +1449,22 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E static void embrace_peer (struct ConsensusPeerInformation *cpi) { - GNUNET_assert (GNUNET_YES == cpi->hello); - switch (cpi->session->current_round) + if (GNUNET_NO == is_peer_connected (cpi)) + { + ensure_peer_connected (cpi); + return; + } + if (GNUNET_NO == cpi->hello) + return; + /* FIXME: correctness of switch */ + switch (cpi->session->current_round) { case CONSENSUS_ROUND_EXCHANGE: + case CONSENSUS_ROUND_INVENTORY: if (cpi->session->partner_outgoing != cpi) break; /* fallthrough */ - case CONSENSUS_ROUND_INVENTORY: - /* fallthrough */ - case CONSENSUS_ROUND_STOCK: - if (cpi == cpi->session->partner_outgoing) + case CONSENSUS_ROUND_COMPLETION: send_strata_estimator (cpi); default: break; @@ -1253,302 +1473,109 @@ embrace_peer (struct ConsensusPeerInformation *cpi) /** - * Handle a HELLO-message, send when another peer wants to join a session where - * our peer is a member. The session may or may not be inhabited yet. + * Called when stream has finishes writing the hello message */ -static int -handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) +static void +hello_cont (void *cls) { - /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ - struct ConsensusSession *session; - int idx; + struct ConsensusPeerInformation *cpi = cls; - for (session = sessions_head; NULL != session; session = session->next) - { - if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) - continue; - idx = get_peer_idx (&inc->peer_id, session); - GNUNET_assert (-1 != idx); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); - inc->cpi = &session->info[idx]; - inc->cpi->mst = inc->mst; - inc->cpi->hello = GNUNET_YES; - inc->cpi->socket = inc->socket; - embrace_peer (inc->cpi); - return GNUNET_YES; - } - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); - return GNUNET_NO; + cpi->hello = GNUNET_YES; + embrace_peer (cpi); } /** - * Send a strata estimator. + * Called when we established a stream connection to another peer * - * @param cpi the peer + * @param cls cpi of the peer we just connected to + * @param socket socket to use to communicate with the other side (read/write) */ static void -send_strata_estimator (struct ConsensusPeerInformation *cpi) +open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) { - struct StrataMessage *strata_msg; - void *buf; - size_t msize; - int i; - - cpi->apparent_round = cpi->session->current_round; - cpi->ibf_state = IBF_STATE_NONE; - cpi->ibf_bucket_counter = 0; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n", - cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info)); + struct ConsensusPeerInformation *cpi = cls; + struct PendingMessage *pm; + struct ConsensusHello *hello; - msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); + GNUNET_assert (NULL == cpi->mss.mst); + GNUNET_assert (NULL == cpi->mss.mq); - strata_msg = GNUNET_malloc (msize); - strata_msg->header.size = htons (msize); - strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); - strata_msg->round = cpi->session->current_round; - strata_msg->exp_round = cpi->session->exp_round; - strata_msg->exp_subround = cpi->session->exp_subround; - - buf = &strata_msg[1]; - for (i = 0; i < STRATA_COUNT; i++) - { - ibf_write (cpi->session->se->strata[i], &buf, NULL); - } + cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); + cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); + cpi->mss.mst_cls = cpi; - queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg); + pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); + hello = (struct ConsensusHello *) pm->msg; + memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); + pm->sent_cb = hello_cont; + pm->sent_cb_cls = cpi; + message_queue_add (cpi->mss.mq, pm); + cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, + &stream_data_processor, &cpi->mss); } -/** - * Send an IBF of the order specified in cpi. - * - * @param cpi the peer - */ static void -send_ibf (struct ConsensusPeerInformation *cpi) +replay_premature_message (struct ConsensusPeerInformation *cpi) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - - cpi->ibf_bucket_counter = 0; - while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) + if (NULL != cpi->premature_strata_message) { - int num_buckets; - void *buf; - struct DifferenceDigest *digest; - int msize; - - num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; - /* limit to maximum */ - if (num_buckets > BUCKETS_PER_MESSAGE) - num_buckets = BUCKETS_PER_MESSAGE; - - msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); - - digest = GNUNET_malloc (msize); - digest->header.size = htons (msize); - digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); - digest->order = cpi->ibf_order; - digest->round = cpi->apparent_round; + struct StrataMessage *sm; - buf = &digest[1]; - ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); + sm = cpi->premature_strata_message; + cpi->premature_strata_message = NULL; - queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest); + cpi->replaying_strata_message = GNUNET_YES; + handle_p2p_strata (cpi, sm); + cpi->replaying_strata_message = GNUNET_NO; - cpi->ibf_bucket_counter += num_buckets; + GNUNET_free (sm); } - cpi->ibf_bucket_counter = 0; - cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; } /** - * Decode the current diff ibf, and send elements/requests/reports/ + * Start the inventory round, contact all peers we are supposed to contact. * - * @param cpi partner peer + * @param session the current session */ static void -decode (struct ConsensusPeerInformation *cpi) +start_inventory (struct ConsensusSession *session) { - struct IBF_Key key; - struct GNUNET_HashCode hashcode; - int side; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + int i; + int last; - while (1) + for (i = 0; i < session->num_peers; i++) { - int res; - - res = ibf_decode (cpi->ibf, &side, &key); - if (GNUNET_SYSERR == res) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); - /* decoding failed, we tell the other peer by sending our ibf with a larger order */ - cpi->ibf_order++; - prepare_ibf (cpi); - cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); - cpi->ibf_state = IBF_STATE_TRANSMITTING; - cpi->ibf_bucket_counter = 0; - send_ibf (cpi); - return; - } - if (GNUNET_NO == res) - { - struct ConsensusRoundMessage *msg; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); - msg = GNUNET_malloc (sizeof *msg); - msg->header.size = htons (sizeof *msg); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); - msg->round = cpi->apparent_round; - queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); - return; - } - if (-1 == side) - { - struct ElementList *head; - /* we have the element(s), send it to the other peer */ - ibf_hashcode_from_key (key, &hashcode); - head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); - send_elements (cpi, head); - } - else - { - struct ElementRequest *msg; - size_t msize; - struct IBF_Key *p; - - msize = (sizeof *msg) + sizeof (struct IBF_Key); - msg = GNUNET_malloc (msize); - switch (cpi->apparent_round) - { - case CONSENSUS_ROUND_STOCK: - /* FIXME: check if we really want to request the element */ - case CONSENSUS_ROUND_EXCHANGE: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); - break; - case CONSENSUS_ROUND_INVENTORY: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); - break; - default: - GNUNET_assert (0); - } - msg->header.size = htons (msize); - p = (struct IBF_Key *) &msg[1]; - *p = key; - queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); - } + session->info[i].ibf_bucket_counter = 0; + session->info[i].ibf_state = IBF_STATE_NONE; + session->info[i].is_outgoing = GNUNET_NO; } -} - -/** - * Functions with this signature are called whenever a - * complete message is received by the tokenizer. - * - * Do not call GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure - * @param client identification of the client - * @param message the actual message - * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing - */ -static int -mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) -{ - struct ConsensusPeerInformation *cpi; - cpi = cls; - switch (ntohs (message->type)) + last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; + i = (session->local_peer_idx + 1) % session->num_peers; + while (i != last) { - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: - return handle_p2p_strata (cpi, (struct StrataMessage *) message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: - return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: - return handle_p2p_element (cpi, message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT: - return handle_p2p_element_report (cpi, message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: - return handle_p2p_element_request (cpi, (struct ElementRequest *) message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED: - return handle_p2p_synced (cpi, message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN: - return handle_p2p_fin (cpi, message); - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n", - ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); + session->info[i].is_outgoing = GNUNET_YES; + embrace_peer (&session->info[i]); + i = (i + 1) % session->num_peers; } - return GNUNET_OK; -} - - -/** - * Handle tokenized messages from stream sockets. - * Delegate them if the socket belongs to a session, - * handle hello messages otherwise. - * - * Do not call GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure, unused - * @param client incoming socket this message comes from - * @param message the actual message - * - * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing - */ -static int -mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) -{ - struct IncomingSocket *inc; - inc = (struct IncomingSocket *) client; - switch (ntohs( message->type)) + // tie-breaker for even number of peers + if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) { - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: - return handle_p2p_hello (inc, (struct ConsensusHello *) message); - default: - if (NULL != inc->cpi) - return mst_session_callback (inc->cpi, client, message); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n", - ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); + session->info[last].is_outgoing = GNUNET_YES; + embrace_peer (&session->info[last]); } - return GNUNET_OK; -} - - -/** - * Functions of this type are called upon new stream connection from other peers - * or upon binding error which happen when the app_port given in - * GNUNET_STREAM_listen() is already taken. - * - * @param cls the closure from GNUNET_STREAM_listen - * @param socket the socket representing the stream; NULL on binding error - * @param initiator the identity of the peer who wants to establish a stream - * with us; NULL on binding error - * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the - * stream (the socket will be invalid after the call) - */ -static int -listen_cb (void *cls, - struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_PeerIdentity *initiator) -{ - struct IncomingSocket *incoming; - if (NULL == socket) + for (i = 0; i < session->num_peers; i++) { - GNUNET_break (0); - return GNUNET_SYSERR; + if (GNUNET_NO == session->info[i].is_outgoing) + replay_premature_message (&session->info[i]); } - incoming = GNUNET_malloc (sizeof *incoming); - incoming->socket = socket; - incoming->peer_id = *initiator; - incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &incoming_stream_data_processor, incoming); - incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); - GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); - return GNUNET_OK; } @@ -1563,652 +1590,262 @@ listen_cb (void *cls, * GNUNET_NO if not. */ static int -destroy_element_list_iter (void *cls, +send_client_elements_iter (void *cls, const struct GNUNET_HashCode * key, void *value) { - struct ElementList *el; - el = value; - while (NULL != el) - { - struct ElementList *el_old; - el_old = el; - el = el->next; - GNUNET_free (el_old->element_hash); - GNUNET_free (el_old->element); - GNUNET_free (el_old); - } + struct ConsensusSession *session = cls; + struct ElementInfo *ei = value; + struct PendingMessage *pm; + + /* is the client still there? */ + if (NULL == session->scss.client) + return GNUNET_NO; + + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); + message_queue_add (session->client_mq, pm); return GNUNET_YES; } + /** - * Destroy a session, free all resources associated with it. - * - * @param session the session to destroy - */ -static void -destroy_session (struct ConsensusSession *session) -{ - int i; - - GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); - GNUNET_SERVER_client_drop (session->client); - session->client = NULL; - if (NULL != session->shuffle) - { - GNUNET_free (session->shuffle); - session->shuffle = NULL; - } - if (NULL != session->se) - { - strata_estimator_destroy (session->se); - session->se = NULL; - } - if (NULL != session->info) - { - for (i = 0; i < session->num_peers; i++) - { - struct ConsensusPeerInformation *cpi; - cpi = &session->info[i]; - if ((NULL != cpi) && (NULL != cpi->socket)) - { - if (NULL != cpi->rh) - { - GNUNET_STREAM_read_cancel (cpi->rh); - cpi->rh = NULL; - } - if (NULL != cpi->wh) - { - GNUNET_STREAM_write_cancel (cpi->wh); - cpi->wh = NULL; - } - GNUNET_STREAM_close (cpi->socket); - cpi->socket = NULL; - } - if (NULL != cpi->se) - { - strata_estimator_destroy (cpi->se); - cpi->se = NULL; - } - if (NULL != cpi->ibf) - { - ibf_destroy (cpi->ibf); - cpi->ibf = NULL; - } - if (NULL != cpi->mst) - { - GNUNET_SERVER_mst_destroy (cpi->mst); - cpi->mst = NULL; - } - } - GNUNET_free (session->info); - session->info = NULL; - } - if (NULL != session->ibfs) - { - for (i = 0; i <= MAX_IBF_ORDER; i++) - { - if (NULL != session->ibfs[i]) - { - ibf_destroy (session->ibfs[i]); - session->ibfs[i] = NULL; - } - } - GNUNET_free (session->ibfs); - session->ibfs = NULL; - } - if (NULL != session->values) - { - GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL); - GNUNET_CONTAINER_multihashmap_destroy (session->values); - session->values = NULL; - } - GNUNET_free (session); -} - - -/** - * Disconnect a client, and destroy all sessions associated with it. - * - * @param client the client to disconnect - */ -static void -disconnect_client (struct GNUNET_SERVER_Client *client) -{ - struct ConsensusSession *session; - GNUNET_SERVER_client_disconnect (client); - - /* if the client owns a session, remove it */ - session = sessions_head; - while (NULL != session) - { - if (client == session->client) - { - destroy_session (session); - break; - } - session = session->next; - } -} - - -/** - * Compute a global, (hopefully) unique consensus session id, - * from the local id of the consensus session, and the identities of all participants. - * Thus, if the local id of two consensus sessions coincide, but are not comprised of - * exactly the same peers, the global id will be different. - * - * @param session session to generate the global id for - * @param session_id local id of the consensus session - */ -static void -compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id) -{ - int i; - struct GNUNET_HashCode tmp; - - session->global_id = *session_id; - for (i = 0; i < session->num_peers; ++i) - { - GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp); - session->global_id = tmp; - GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp); - session->global_id = tmp; - } -} - - -/** - * Transmit a queued message to the session's client. - * - * @param cls consensus session - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_queued (void *cls, size_t size, - void *buf) -{ - struct ConsensusSession *session; - struct QueuedMessage *qmsg; - size_t msg_size; - - session = cls; - session->client_th = NULL; - - qmsg = session->client_messages_head; - GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); - GNUNET_assert (qmsg); - - if (NULL == buf) - { - destroy_session (session); - return 0; - } - - msg_size = ntohs (qmsg->msg->size); - - GNUNET_assert (size >= msg_size); - - memcpy (buf, qmsg->msg, msg_size); - GNUNET_free (qmsg->msg); - GNUNET_free (qmsg); - - client_send_next (session); - - return msg_size; -} - - -/** - * Schedule transmitting the next queued message (if any) to the inhabiting client of a session. - * - * @param session the consensus session - */ -static void -client_send_next (struct ConsensusSession *session) -{ - - GNUNET_assert (NULL != session); - - if (NULL != session->client_th) - return; - - if (NULL != session->client_messages_head) - { - int msize; - msize = ntohs (session->client_messages_head->msg->size); - session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_queued, session); - } -} - - -/** - * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have - * the correct signature to be used with e.g. qsort. - * We use this function instead. - * - * @param h1 some hash code - * @param h2 some hash code - * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2. - */ -static int -hash_cmp (const void *h1, const void *h2) -{ - return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2); -} - - -/** - * Search peer in the list of peers in session. - * - * @param peer peer to find - * @param session session with peer - * @return index of peer, -1 if peer is not in session - */ -static int -get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) -{ - int i; - for (i = 0; i < session->num_peers; i++) - if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) - return i; - return -1; -} - - -/** - * Called when stream has finishes writing the hello message - */ -static void -hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) -{ - struct ConsensusPeerInformation *cpi; - - cpi = cls; - cpi->wh = NULL; - cpi->hello = GNUNET_YES; - GNUNET_assert (GNUNET_STREAM_OK == status); - embrace_peer (cpi); -} - - -/** - * Called when we established a stream connection to another peer - * - * @param cls cpi of the peer we just connected to - * @param socket socket to use to communicate with the other side (read/write) - */ -static void -open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) -{ - struct ConsensusPeerInformation *cpi; - struct ConsensusHello *hello; - - cpi = cls; - hello = GNUNET_malloc (sizeof *hello); - hello->header.size = htons (sizeof *hello); - hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); - memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); - GNUNET_assert (NULL == cpi->mst); - cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); - cpi->wh = - GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); - GNUNET_free (hello); - cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &session_stream_data_processor, cpi); -} - - -/** - * Create the sorted list of peers for the session, - * add the local peer if not in the join message. - */ -static void -initialize_session_peer_list (struct ConsensusSession *session) -{ - unsigned int local_peer_in_list; - uint32_t listed_peers; - const struct GNUNET_PeerIdentity *msg_peers; - struct GNUNET_PeerIdentity *peers; - unsigned int i; - - GNUNET_assert (NULL != session->join_msg); - - /* peers in the join message, may or may not include the local peer */ - listed_peers = ntohl (session->join_msg->num_peers); - - session->num_peers = listed_peers; - - msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1]; - - local_peer_in_list = GNUNET_NO; - for (i = 0; i < listed_peers; i++) - { - if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) - { - local_peer_in_list = GNUNET_YES; - break; - } - } - - if (GNUNET_NO == local_peer_in_list) - session->num_peers++; - - peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); - - if (GNUNET_NO == local_peer_in_list) - peers[session->num_peers - 1] = *my_peer; - - memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); - qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); - - session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); - - for (i = 0; i < session->num_peers; ++i) - { - /* initialize back-references, so consensus peer information can - * be used as closure */ - session->info[i].session = session; - session->info[i].peer_id = peers[i]; - } - - free (peers); -} - - -static void -strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) -{ - uint32_t v; - int i; - v = key->bits[0]; - /* count trailing '1'-bits of v */ - for (i = 0; v & 1; v>>=1, i++) - /* empty */; - ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); -} - - -/** - * Add incoming peer connections to the session, - * for peers who have connected to us before the local session has been established - * - * @param session ... - */ -static void -add_incoming_peers (struct ConsensusSession *session) -{ - struct IncomingSocket *inc; - inc = incoming_sockets_head; - - while (NULL != inc) - { - if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) - { - int i; - for (i = 0; i < session->num_peers; i++) - { - struct ConsensusPeerInformation *cpi; - cpi = &session->info[i]; - if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity))) - { - cpi->socket = inc->socket; - inc->cpi = cpi; - inc->cpi->mst = inc->mst; - inc->cpi->hello = GNUNET_YES; - break; - } - } - } - inc = inc->next; - } -} - - -/** - * Initialize the session, continue receiving messages from the owning client - * - * @param session the session to initialize - */ -static void -initialize_session (struct ConsensusSession *session) -{ - const struct ConsensusSession *other_session; - - GNUNET_assert (NULL != session->join_msg); - initialize_session_peer_list (session); - session->current_round = CONSENSUS_ROUND_BEGIN; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); - compute_global_id (session, &session->join_msg->session_id); - - /* Check if some local client already owns the session. */ - other_session = sessions_head; - while (NULL != other_session) - { - if ((other_session != session) && - (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) - { - /* session already owned by another client */ - GNUNET_break (0); - disconnect_client (session->client); - return; - } - other_session = other_session->next; - } - - session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); - session->local_peer_idx = get_peer_idx (my_peer, session); - GNUNET_assert (-1 != session->local_peer_idx); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); - session->se = strata_estimator_create (); - session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); - GNUNET_free (session->join_msg); - session->join_msg = NULL; - add_incoming_peers (session); - GNUNET_SERVER_receive_done (session->client, GNUNET_OK); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); -} - - -/** - * Called when a client wants to join a consensus session. + * Start the next round. + * This function can be invoked as a timeout task, or called manually (tc will be NULL then). * - * @param cls unused - * @param client client that sent the message - * @param m message sent by the client + * @param cls the session + * @param tc task context, for when this task is invoked by the scheduler, + * NULL if invoked for another reason */ -static void -client_join (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) -{ - struct ConsensusSession *session; - - // make sure the client has not already joined a session - session = sessions_head; - while (NULL != session) - { - if (session->client == client) - { - GNUNET_break (0); - disconnect_client (client); - return; - } - session = session->next; - } +static void +round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct ConsensusSession *session; - session = GNUNET_malloc (sizeof (struct ConsensusSession)); - session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); - session->client = client; - GNUNET_SERVER_client_keep (client); + /* don't kick off next round if we're shutting down */ + if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; - GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); + session = cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); - // Initialize session later if local peer identity is not known yet. - if (NULL == my_peer) + if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) { - GNUNET_SERVER_disable_receive_done_warning (client); - return; + GNUNET_SCHEDULER_cancel (session->round_timeout_tid); + session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; } - initialize_session (session); + switch (session->current_round) + { + case CONSENSUS_ROUND_BEGIN: + session->current_round = CONSENSUS_ROUND_EXCHANGE; + session->exp_round = 0; + subround_over (session, NULL); + break; + case CONSENSUS_ROUND_EXCHANGE: + /* handle two peers specially */ + if (session->num_peers <= 2) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); + GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); + send_client_conclude_done (session); + session->current_round = CONSENSUS_ROUND_FINISH; + return; + } + session->current_round = CONSENSUS_ROUND_INVENTORY; + start_inventory (session); + break; + case CONSENSUS_ROUND_INVENTORY: + session->current_round = CONSENSUS_ROUND_COMPLETION; + session->exp_round = 0; + subround_over (session, NULL); + break; + case CONSENSUS_ROUND_COMPLETION: + session->current_round = CONSENSUS_ROUND_FINISH; + send_client_conclude_done (session); + break; + default: + GNUNET_assert (0); + } } -/** - * Hash a block of data, producing a replicated ibf hash. - */ static void -hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret) +fin_sent_cb (void *cls) { - struct IBF_Key ibf_key; - GNUNET_CRYPTO_hash (block, size, ret); - ibf_key = ibf_key_from_hashcode (ret); - ibf_hashcode_from_key (ibf_key, ret); + struct ConsensusPeerInformation *cpi; + cpi = cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); + switch (cpi->session->current_round) + { + case CONSENSUS_ROUND_EXCHANGE: + case CONSENSUS_ROUND_COMPLETION: + if (cpi->session->current_round != cpi->apparent_round) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); + break; + } + cpi->exp_subround_finished = GNUNET_YES; + /* the subround is only really over if *both* partners are done */ + if (GNUNET_YES == exp_subround_finished (cpi->session)) + subround_over (cpi->session, NULL); + else + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); + break; + case CONSENSUS_ROUND_INVENTORY: + cpi->inventory_synced = GNUNET_YES; + if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) + round_over (cpi->session, NULL); + /* FIXME: maybe go to next round */ + break; + default: + GNUNET_break (0); + } } -static void -insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) +/** + * The other peer wants us to inform that he sent us all the elements we requested. + */ +static int +handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) { - struct GNUNET_HashCode hash; - struct ElementList *head; - - hash_for_ibf (element->data, element->size, &hash); - - head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash); - - if (NULL == head) - { - int i; - - head = GNUNET_malloc (sizeof *head); - head->element = element; - head->next = NULL; - head->element_hash = GNUNET_memdup (&hash, sizeof hash); - GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - strata_estimator_insert (session->se, &hash); - - for (i = 0; i <= MAX_IBF_ORDER; i++) - if (NULL != session->ibfs[i]) - ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash)); - } - else + struct ConsensusRoundMessage *round_msg; + round_msg = (struct ConsensusRoundMessage *) msg; + /* FIXME: only call subround_over if round is the current one! */ + switch (cpi->session->current_round) { - struct ElementList *el; - el = GNUNET_malloc (sizeof *el); - head->element = element; - head->next = NULL; - head->element_hash = GNUNET_memdup (&hash, sizeof hash); - while (NULL != head->next) - head = head->next; - head->next = el; + case CONSENSUS_ROUND_EXCHANGE: + case CONSENSUS_ROUND_COMPLETION: + if (cpi->session->current_round != round_msg->round) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + cpi->ibf_state = IBF_STATE_NONE; + cpi->ibf_bucket_counter = 0; + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + cpi->exp_subround_finished = GNUNET_YES; + if (GNUNET_YES == exp_subround_finished (cpi->session)) + subround_over (cpi->session, NULL); + else + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); + break; + case CONSENSUS_ROUND_INVENTORY: + cpi->inventory_synced = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + if (inventory_round_finished (cpi->session)) + round_over (cpi->session, NULL); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); + break; } + return GNUNET_YES; } /** - * Called when a client performs an insert operation. - * - * @param cls (unused) - * @param client client handle - * @param m message sent by the client + * Gets called when the other peer wants us to inform that + * it has decoded our ibf and sent us all elements / requests */ -void -client_insert (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +static int +handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) { - struct ConsensusSession *session; - struct GNUNET_CONSENSUS_ElementMessage *msg; - struct GNUNET_CONSENSUS_Element *element; - int element_size; + struct PendingMessage *pm; + struct ConsensusRoundMessage *fin_msg; - session = sessions_head; - while (NULL != session) + /* FIXME: why handle current round?? */ + switch (cpi->session->current_round) { - if (session->client == client) + case CONSENSUS_ROUND_INVENTORY: + cpi->inventory_synced = GNUNET_YES; + case CONSENSUS_ROUND_COMPLETION: + case CONSENSUS_ROUND_EXCHANGE: + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n"); + pm = new_pending_message (sizeof *fin_msg, + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); + fin_msg = (struct ConsensusRoundMessage *) pm->msg; + fin_msg->round = cpi->apparent_round; + /* the subround is over once we kicked off sending the fin msg */ + /* FIXME: assert we are talking to the right peer! */ + /* FIXME: mark peer as synced */ + pm->sent_cb = fin_sent_cb; + pm->sent_cb_cls = cpi; + message_queue_add (cpi->mss.mq, pm); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); break; } - - if (NULL == session) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); - GNUNET_SERVER_client_disconnect (client); - return; - } - - msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; - element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); - - element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); - - element->type = msg->element_type; - element->size = element_size; - memcpy (&element[1], &msg[1], element_size); - element->data = &element[1]; - - GNUNET_assert (NULL != element->data); - - insert_element (session, element); - - GNUNET_SERVER_receive_done (client, GNUNET_OK); - - client_send_next (session); + return GNUNET_YES; } - /** - * Functions of this signature are called whenever writing operations - * on a stream are executed + * Functions with this signature are called whenever a + * complete message is received by the tokenizer. * - * @param cls the closure from GNUNET_STREAM_write - * @param status the status of the stream at the time this function is called; - * GNUNET_STREAM_OK if writing to stream was completed successfully; - * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully - * (this doesn't mean that the data is never sent, the receiver may - * have read the data but its ACKs may have been lost); - * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the - * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot - * be processed. - * @param size the number of bytes written + * Do not call GNUNET_SERVER_mst_destroy in callback + * + * @param cls closure + * @param client identification of the client + * @param message the actual message + * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing */ -static void -write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) +static int +mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) { - struct ConsensusPeerInformation *cpi; - - GNUNET_assert (GNUNET_STREAM_OK == status); - cpi = cls; - cpi->wh = NULL; - if (NULL != cpi->messages_head) - { - struct QueuedMessage *qm; - qm = cpi->messages_head; - GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm); - cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - write_queued, cpi); - if (NULL != qm->cb) - qm->cb (qm->cls); - GNUNET_free (qm->msg); - GNUNET_free (qm); - GNUNET_assert (NULL != cpi->wh); + struct ConsensusPeerInformation *cpi = cls; + GNUNET_assert (NULL == client); + GNUNET_assert (NULL != cls); + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: + return handle_p2p_strata (cpi, (struct StrataMessage *) message); + case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: + return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); + case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: + return handle_p2p_element (cpi, message); + case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT: + return handle_p2p_element_report (cpi, message); + case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: + return handle_p2p_element_request (cpi, (struct ElementRequest *) message); + case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED: + return handle_p2p_synced (cpi, message); + case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN: + return handle_p2p_fin (cpi, message); + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n", + ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey)); } + return GNUNET_OK; } static void shuffle (struct ConsensusSession *session) { - /* FIXME: implement */ + /* adapted from random_permute in util/crypto_random.c */ + /* FIXME + unsigned int *ret; + unsigned int i; + unsigned int tmp; + uint32_t x; + + GNUNET_assert (n > 0); + ret = GNUNET_malloc (n * sizeof (unsigned int)); + for (i = 0; i < n; i++) + ret[i] = i; + for (i = n - 1; i > 0; i--) + { + x = GNUNET_CRYPTO_random_u32 (mode, i + 1); + tmp = ret[x]; + ret[x] = ret[i]; + ret[i] = tmp; + } + */ } @@ -2241,30 +1878,10 @@ find_partners (struct ConsensusSession *session) } if (arc == session->local_peer_idx) { - GNUNET_assert (NULL == session->partner_incoming); - session->partner_incoming = &session->info[session->shuffle[i]]; - session->partner_incoming->exp_subround_finished = GNUNET_NO; - } - } -} - - -static void -replay_premature_message (struct ConsensusPeerInformation *cpi) -{ - if (NULL != cpi->premature_strata_message) - { - struct StrataMessage *sm; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); - sm = cpi->premature_strata_message; - cpi->premature_strata_message = NULL; - - cpi->replaying_strata_message = GNUNET_YES; - handle_p2p_strata (cpi, sm); - cpi->replaying_strata_message = GNUNET_NO; - - GNUNET_free (sm); + GNUNET_assert (NULL == session->partner_incoming); + session->partner_incoming = &session->info[session->shuffle[i]]; + session->partner_incoming->exp_subround_finished = GNUNET_NO; + } } } @@ -2287,13 +1904,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; session = cls; - /* don't send any messages from the last round */ - /* - clear_peer_messages (session->partner_outgoing); - clear_peer_messages (session->partner_incoming); - for (i = 0; i < session->num_peers; i++) - clear_peer_messages (&session->info[i]); - */ /* cancel timeout */ if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) GNUNET_SCHEDULER_cancel (session->round_timeout_tid); @@ -2357,162 +1967,455 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) replay_premature_message (session->partner_incoming); } - if (NULL != session->partner_outgoing) + if (NULL != session->partner_outgoing) + { + session->partner_outgoing->ibf_state = IBF_STATE_NONE; + session->partner_outgoing->ibf_bucket_counter = 0; + session->partner_outgoing->exp_subround_finished = GNUNET_NO; + /* make sure peer is connected and send the SE */ + embrace_peer (session->partner_outgoing); + } + + /* + session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), + subround_over, session); + */ +} + + +/** + * Search peer in the list of peers in session. + * + * @param peer peer to find + * @param session session with peer + * @return index of peer, -1 if peer is not in session + */ +static int +get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) +{ + int i; + for (i = 0; i < session->num_peers; i++) + if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) + return i; + return -1; +} + + +/** + * Handle a HELLO-message, send when another peer wants to join a session where + * our peer is a member. The session may or may not be inhabited yet. + */ +static int +handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) +{ + struct ConsensusSession *session; + + if (NULL != inc->requested_gid) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n"); + return GNUNET_YES; + } + if (NULL != inc->cpi) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n"); + return GNUNET_YES; + } + + for (session = sessions_head; NULL != session; session = session->next) + { + int idx; + struct ConsensusPeerInformation *cpi; + if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) + continue; + idx = get_peer_idx (&inc->peer_id, session); + GNUNET_assert (-1 != idx); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); + cpi = &session->info[idx]; + inc->cpi = cpi; + cpi->mss = inc->mss; + cpi = &session->info[idx]; + cpi->hello = GNUNET_YES; + cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); + embrace_peer (cpi); + return GNUNET_YES; + } + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); + inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode)); + return GNUNET_YES; +} + + + +/** + * Handle tokenized messages from stream sockets. + * Delegate them if the socket belongs to a session, + * handle hello messages otherwise. + * + * Do not call GNUNET_SERVER_mst_destroy in callback + * + * @param cls closure, unused + * @param client incoming socket this message comes from + * @param message the actual message + * + * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing + */ +static int +mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) +{ + struct IncomingSocket *inc; + GNUNET_assert (NULL == client); + GNUNET_assert (NULL != cls); + inc = (struct IncomingSocket *) cls; + switch (ntohs( message->type)) + { + case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: + return handle_p2p_hello (inc, (struct ConsensusHello *) message); + default: + if (NULL != inc->cpi) + return mst_session_callback (inc->cpi, client, message); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n", + ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey)); + } + return GNUNET_OK; +} + + +/** + * Functions of this type are called upon new stream connection from other peers + * or upon binding error which happen when the app_port given in + * GNUNET_STREAM_listen() is already taken. + * + * @param cls the closure from GNUNET_STREAM_listen + * @param socket the socket representing the stream; NULL on binding error + * @param initiator the identity of the peer who wants to establish a stream + * with us; NULL on binding error + * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the + * stream (the socket will be invalid after the call) + */ +static int +listen_cb (void *cls, + struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_PeerIdentity *initiator) +{ + struct IncomingSocket *incoming; + + if (NULL == socket) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + incoming = GNUNET_malloc (sizeof *incoming); + incoming->peer_id = *initiator; + incoming->mss.socket = socket; + incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, + &stream_data_processor, &incoming->mss); + incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); + incoming->mss.mst_cls = incoming; + GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); + return GNUNET_OK; +} + + +/** + * Disconnect a client, and destroy all sessions associated with it. + * + * @param client the client to disconnect + */ +static void +disconnect_client (struct GNUNET_SERVER_Client *client) +{ + struct ConsensusSession *session; + GNUNET_SERVER_client_disconnect (client); + + /* if the client owns a session, remove it */ + session = sessions_head; + while (NULL != session) + { + if (client == session->scss.client) + { + destroy_session (session); + break; + } + session = session->next; + } +} + + +/** + * Compute a global, (hopefully) unique consensus session id, + * from the local id of the consensus session, and the identities of all participants. + * Thus, if the local id of two consensus sessions coincide, but are not comprised of + * exactly the same peers, the global id will be different. + * + * @param session session to generate the global id for + * @param session_id local id of the consensus session + */ +static void +compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id) +{ + int i; + struct GNUNET_HashCode tmp; + + session->global_id = *session_id; + for (i = 0; i < session->num_peers; ++i) + { + GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp); + session->global_id = tmp; + GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp); + session->global_id = tmp; + } +} + + +/** + * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have + * the correct signature to be used with e.g. qsort. + * We use this function instead. + * + * @param h1 some hash code + * @param h2 some hash code + * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2. + */ +static int +hash_cmp (const void *h1, const void *h2) +{ + return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2); +} + + +/** + * Create the sorted list of peers for the session, + * add the local peer if not in the join message. + */ +static void +initialize_session_peer_list (struct ConsensusSession *session) +{ + unsigned int local_peer_in_list; + uint32_t listed_peers; + const struct GNUNET_PeerIdentity *msg_peers; + struct GNUNET_PeerIdentity *peers; + unsigned int i; + + GNUNET_assert (NULL != session->join_msg); + + /* peers in the join message, may or may not include the local peer */ + listed_peers = ntohl (session->join_msg->num_peers); + + session->num_peers = listed_peers; + + msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1]; + + local_peer_in_list = GNUNET_NO; + for (i = 0; i < listed_peers; i++) + { + if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) + { + local_peer_in_list = GNUNET_YES; + break; + } + } + + if (GNUNET_NO == local_peer_in_list) + session->num_peers++; + + peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); + + if (GNUNET_NO == local_peer_in_list) + peers[session->num_peers - 1] = *my_peer; + + memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); + qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); + + session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); + + for (i = 0; i < session->num_peers; ++i) + { + /* initialize back-references, so consensus peer information can + * be used as closure */ + session->info[i].session = session; + session->info[i].peer_id = peers[i]; + } + + free (peers); +} + + +/** + * Add incoming peer connections to the session, + * for peers who have connected to us before the local session has been established + * + * @param session ... + */ +static void +add_incoming_peers (struct ConsensusSession *session) +{ + struct IncomingSocket *inc; + int i; + struct ConsensusPeerInformation *cpi; + + for (inc = incoming_sockets_head; NULL != inc; inc = inc->next) { - session->partner_outgoing->ibf_state = IBF_STATE_NONE; - session->partner_outgoing->ibf_bucket_counter = 0; - session->partner_outgoing->exp_subround_finished = GNUNET_NO; - - if (NULL == session->partner_outgoing->socket) - { - session->partner_outgoing->socket = - GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, - open_cb, session->partner_outgoing, - GNUNET_STREAM_OPTION_END); - } - else if (GNUNET_YES == session->partner_outgoing->hello) + if ( (NULL == inc->requested_gid) || + (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) ) + continue; + for (i = 0; i < session->num_peers; i++) { - send_strata_estimator (session->partner_outgoing); + cpi = &session->info[i]; + cpi->peer_id = inc->peer_id; + cpi->mss = inc->mss; + cpi->hello = GNUNET_YES; + inc->cpi = cpi; + break; } - /* else: do nothing, the send hello cb will handle this */ } - - /* - session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), - subround_over, session); - */ } + +/** + * Initialize the session, continue receiving messages from the owning client + * + * @param session the session to initialize + */ static void -contact_peer_a2a (struct ConsensusPeerInformation *cpi) +initialize_session (struct ConsensusSession *session) { - cpi->is_outgoing = GNUNET_YES; - if (NULL == cpi->socket) - { - cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, - open_cb, cpi, GNUNET_STREAM_OPTION_END); - } - else if (GNUNET_YES == cpi->hello) + const struct ConsensusSession *other_session; + + GNUNET_assert (NULL != session->join_msg); + initialize_session_peer_list (session); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); + compute_global_id (session, &session->join_msg->session_id); + + /* Check if some local client already owns the session. */ + other_session = sessions_head; + while (NULL != other_session) { - send_strata_estimator (cpi); + if ((other_session != session) && + (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) + { + if (GNUNET_NO == other_session->conclude) + { + /* session already owned by another client */ + GNUNET_break (0); + disconnect_client (session->scss.client); + return; + } + else + { + GNUNET_SERVER_client_drop (session->scss.client); + session->scss.client = NULL; + break; + } + } + other_session = other_session->next; } + + session->local_peer_idx = get_peer_idx (my_peer, session); + GNUNET_assert (-1 != session->local_peer_idx); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); + GNUNET_free (session->join_msg); + session->join_msg = NULL; + add_incoming_peers (session); + GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); } + /** - * Start the inventory round, contact all peers we are supposed to contact. + * Called when a client wants to join a consensus session. * - * @param session the current session + * @param cls unused + * @param client client that sent the message + * @param m message sent by the client */ static void -start_inventory (struct ConsensusSession *session) +client_join (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) { - int i; - int last; + struct ConsensusSession *session; - for (i = 0; i < session->num_peers; i++) + // make sure the client has not already joined a session + session = sessions_head; + while (NULL != session) { - session->info[i].ibf_bucket_counter = 0; - session->info[i].ibf_state = IBF_STATE_NONE; - session->info[i].is_outgoing = GNUNET_NO; + if (session->scss.client == client) + { + GNUNET_break (0); + disconnect_client (client); + return; + } + session = session->next; } - last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; - i = (session->local_peer_idx + 1) % session->num_peers; - while (i != last) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); - contact_peer_a2a (&session->info[i]); - session->info[i].is_outgoing = GNUNET_YES; - i = (i + 1) % session->num_peers; - } - // tie-breaker for even number of peers - if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); - session->info[last].is_outgoing = GNUNET_YES; - contact_peer_a2a (&session->info[last]); - } + session = GNUNET_new (struct ConsensusSession); + session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); + /* these have to be initialized here, as the client can already start to give us values */ + session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); + session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); + session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); + session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); + session->scss.client = client; + session->client_mq = create_message_queue_for_server_client (&session->scss); + GNUNET_SERVER_client_keep (client); - for (i = 0; i < session->num_peers; i++) + GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); + + // Initialize session later if local peer identity is not known yet. + if (NULL == my_peer) { - if (GNUNET_NO == session->info[i].is_outgoing) - replay_premature_message (&session->info[i]); + GNUNET_SERVER_disable_receive_done_warning (client); + return; } -} -static void -send_client_conclude_done (struct ConsensusSession *session) -{ - struct GNUNET_MessageHeader *msg; - session->current_round = CONSENSUS_ROUND_FINISH; - msg = GNUNET_malloc (sizeof *msg); - msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); - msg->size = htons (sizeof *msg); - queue_client_message (session, msg); - client_send_next (session); + initialize_session (session); } + + + /** - * Start the next round. - * This function can be invoked as a timeout task, or called manually (tc will be NULL then). + * Called when a client performs an insert operation. * - * @param cls the session - * @param tc task context, for when this task is invoked by the scheduler, - * NULL if invoked for another reason + * @param cls (unused) + * @param client client handle + * @param m message sent by the client */ -static void -round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +void +client_insert (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) { struct ConsensusSession *session; + struct GNUNET_CONSENSUS_ElementMessage *msg; + struct GNUNET_CONSENSUS_Element *element; + int element_size; - /* don't kick off next round if we're shutting down */ - if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - - session = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); - - /* - for (i = 0; i < session->num_peers; i++) - clear_peer_messages (&session->info[i]); - */ - - if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) + session = sessions_head; + while (NULL != session) { - GNUNET_SCHEDULER_cancel (session->round_timeout_tid); - session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; + if (session->scss.client == client) + break; } - switch (session->current_round) + if (NULL == session) { - case CONSENSUS_ROUND_BEGIN: - session->current_round = CONSENSUS_ROUND_EXCHANGE; - session->exp_round = 0; - subround_over (session, NULL); - break; - case CONSENSUS_ROUND_EXCHANGE: - /* handle two peers specially */ - if (session->num_peers <= 2) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx); - send_client_conclude_done (session); - return; - } - session->current_round = CONSENSUS_ROUND_INVENTORY; - start_inventory (session); - break; - case CONSENSUS_ROUND_INVENTORY: - session->current_round = CONSENSUS_ROUND_STOCK; - session->exp_round = 0; - subround_over (session, NULL); - break; - case CONSENSUS_ROUND_STOCK: - session->current_round = CONSENSUS_ROUND_FINISH; - send_client_conclude_done (session); - break; - default: - GNUNET_assert (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); + GNUNET_SERVER_client_disconnect (client); + return; } + + msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; + element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); + element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); + element->type = msg->element_type; + element->size = element_size; + memcpy (&element[1], &msg[1], element_size); + element->data = &element[1]; + GNUNET_assert (NULL != element->data); + insert_element (session, element); + + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2534,7 +2437,7 @@ client_conclude (void *cls, cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; session = sessions_head; - while ((session != NULL) && (session->client != client)) + while ((session != NULL) && (session->scss.client != client)) session = session->next; if (NULL == session) { @@ -2553,6 +2456,8 @@ client_conclude (void *cls, return; } + session->conclude = GNUNET_YES; + if (session->num_peers <= 1) { send_client_conclude_done (session); @@ -2564,57 +2469,6 @@ client_conclude (void *cls, round_over (session, NULL); } - GNUNET_SERVER_receive_done (client, GNUNET_OK); - client_send_next (session); -} - - -/** - * Called when a client sends an ack - * - * @param cls (unused) - * @param client client handle - * @param message message sent by the client - */ -static void -client_ack (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - struct ConsensusSession *session; - struct GNUNET_CONSENSUS_AckMessage *msg; - struct PendingElement *pending; - struct GNUNET_CONSENSUS_Element *element; - - session = sessions_head; - while (NULL != session) - { - if (session->client == client) - break; - } - - if (NULL == session) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n"); - GNUNET_SERVER_client_disconnect (client); - return; - } - - pending = session->client_approval_head; - - GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending); - - msg = (struct GNUNET_CONSENSUS_AckMessage *) message; - - if (msg->keep) - { - element = pending->element; - insert_element (session, element); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n"); - } - - GNUNET_free (pending); - GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2649,14 +2503,10 @@ core_startup (void *cls, /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ GNUNET_SCHEDULER_add_now (&disconnect_core, core); GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); - - session = sessions_head; - while (NULL != session) - { + /* initialize sessions that are waiting for the local peer identity */ + for (session = sessions_head; NULL != session; session = session->next) if (NULL != session->join_msg) initialize_session (session); - session = session->next; - } } @@ -2670,27 +2520,12 @@ static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - /* FIXME: complete; write separate destructors for different data types */ - while (NULL != incoming_sockets_head) { struct IncomingSocket *socket; socket = incoming_sockets_head; - if (NULL != socket->rh) - { - GNUNET_STREAM_read_cancel (socket->rh); - socket->rh = NULL; - } if (NULL == socket->cpi) - { - GNUNET_STREAM_close (socket->socket); - socket->socket = NULL; - if (NULL != socket->mst) - { - GNUNET_SERVER_mst_destroy (socket->mst); - socket->mst = NULL; - } - } + clear_message_stream_state (&socket->mss); incoming_sockets_head = incoming_sockets_head->next; GNUNET_free (socket); } @@ -2738,8 +2573,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, - {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, - sizeof (struct GNUNET_CONSENSUS_AckMessage)}, {NULL, NULL, 0, 0} }; diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index 87dbdd696..739b97339 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c @@ -63,12 +63,10 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst) * * @param size number of IBF buckets * @param hash_num number of buckets one element is hashed in - * @param salt salt for mingling hashes, different salt may - * result in less (or more) collisions * @return the newly created invertible bloom filter */ struct InvertibleBloomFilter * -ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt) +ibf_create (uint32_t size, uint8_t hash_num) { struct InvertibleBloomFilter *ibf; @@ -235,32 +233,25 @@ ibf_decode (struct InvertibleBloomFilter *ibf, /** - * Write an ibf. + * Write buckets from an ibf to a buffer. + * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. * * @param ibf the ibf to write * @param start with which bucket to start * @param count how many buckets to write - * @param buf buffer to write the data to, will be updated to point to the - * first byte after the written data - * @param size pointer to the size of the buffer, will be updated, can be NULL + * @param buf buffer to write the data to */ void -ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size) +ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf) { struct IBF_Key *key_dst; struct IBF_KeyHash *key_hash_dst; struct IBF_Count *count_dst; - /* update size and check for overflow */ - if (NULL != size) - { - size_t old_size; - old_size = *size; - *size = *size - count * IBF_BUCKET_SIZE; - GNUNET_assert (*size < old_size); - } + GNUNET_assert (start + count <= ibf->size); + /* copy keys */ - key_dst = (struct IBF_Key *) *buf; + key_dst = (struct IBF_Key *) buf; memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst); key_dst += count; /* copy key hashes */ @@ -271,40 +262,28 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32 count_dst = (struct IBF_Count *) key_hash_dst; memcpy (count_dst, ibf->count + start, count * sizeof *count_dst); count_dst += count; - /* returned buffer is at the end of written data*/ - *buf = (void *) count_dst; } /** - * Read an ibf. + * Read buckets from a buffer into an ibf. * - * @param buf pointer to the buffer to write to, will point to first - * byte after the written data // FIXME: take 'const void *buf' for input, return number of bytes READ - * @param size size of the buffer, will be updated + * @param buf pointer to the buffer to read from * @param start which bucket to start at * @param count how many buckets to read * @param ibf the ibf to read from - * @return GNUNET_OK on success // FIXME: return 0 on error (or -1/ssize_t), number of bytes read otherwise */ -int -ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) +void +ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) { struct IBF_Key *key_src; struct IBF_KeyHash *key_hash_src; struct IBF_Count *count_src; - /* update size and check for overflow */ - if (NULL != size) - { - size_t old_size; - old_size = *size; - *size = *size - count * IBF_BUCKET_SIZE; - if (*size > old_size) - return GNUNET_SYSERR; - } + GNUNET_assert (start + count <= ibf->size); + /* copy keys */ - key_src = (struct IBF_Key *) *buf; + key_src = (struct IBF_Key *) buf; memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src); key_src += count; /* copy key hashes */ @@ -315,40 +294,6 @@ ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct count_src = (struct IBF_Count *) key_hash_src; memcpy (ibf->count + start, count_src, count * sizeof *count_src); count_src += count; - /* returned buffer is at the end of written data*/ - *buf = (void *) count_src; - return GNUNET_OK; -} - - -/** - * Write an ibf. - * - * @param ibf the ibf to write - * @param buf buffer to write the data to, will be updated to point to the - * first byte after the written data - * @param size pointer to the size of the buffer, will be updated, can be NULL - */ -void -ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size) -{ - ibf_write_slice (ibf, 0, ibf->size, buf, size); -} - - -/** - * Read an ibf. - * - * @param buf pointer to the buffer to write to, will point to first - * byte after the written data - * @param size size of the buffer, will be updated - * @param dst ibf to write buckets to - * @return GNUNET_OK on success - */ -int -ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst) -{ - return ibf_read_slice (buf, size, 0, dst->size, dst); } @@ -366,7 +311,6 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi GNUNET_assert (ibf1->size == ibf2->size); GNUNET_assert (ibf1->hash_num == ibf2->hash_num); - GNUNET_assert (ibf1->salt == ibf2->salt); for (i = 0; i < ibf1->size; i++) { @@ -388,7 +332,6 @@ ibf_dup (const struct InvertibleBloomFilter *ibf) struct InvertibleBloomFilter *copy; copy = GNUNET_malloc (sizeof *copy); copy->hash_num = ibf->hash_num; - copy->salt = ibf->salt; copy->size = ibf->size; copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash)); copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key)); diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h index 609653889..2bf3ef7c7 100644 --- a/src/consensus/ibf.h +++ b/src/consensus/ibf.h @@ -80,11 +80,6 @@ struct InvertibleBloomFilter */ uint8_t hash_num; - /** - * Salt for mingling hashes - */ - uint32_t salt; - /** * Xor sums of the elements' keys, used to identify the elements. * Array of 'size' elements. @@ -107,58 +102,28 @@ struct InvertibleBloomFilter /** - * Write an ibf. + * Write buckets from an ibf to a buffer. + * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. * * @param ibf the ibf to write * @param start with which bucket to start * @param count how many buckets to write - * @param buf buffer to write the data to, will be updated to point to the - * first byte after the written data - * @param size pointer to the size of the buffer, will be updated, can be NULL + * @param buf buffer to write the data to */ void -ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size); +ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf); /** - * Read an ibf. + * Read buckets from a buffer into an ibf. * - * @param buf pointer to the buffer to write to, will point to first - * byte after the written data - * @param size size of the buffer, will be updated + * @param buf pointer to the buffer to read from * @param start which bucket to start at * @param count how many buckets to read * @param ibf the ibf to read from - * @return GNUNET_OK on success - */ -int -ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *dst); - - -/** - * Write an ibf. - * - * @param ibf the ibf to write - * @param buf buffer to write the data to, will be updated to point to the - * first byte after the written data - * @param size pointer to the size of the buffer, will be updated, can be NULL */ void -ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size); - - - -/** - * Read an ibf. - * - * @param buf pointer to the buffer to write to, will point to first - * byte after the written data - * @param size size of the buffer, will be updated - * @param dst ibf to write buckets to - * @return GNUNET_OK on success - */ -int -ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst); +ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf); /** @@ -187,12 +152,10 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst); * * @param size number of IBF buckets * @param hash_num number of buckets one element is hashed in, usually 3 or 4 - * @param salt salt for mingling hashes, different salt may - * result in less (or more) collisions * @return the newly created invertible bloom filter */ struct InvertibleBloomFilter * -ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt); +ibf_create (uint32_t size, uint8_t hash_num); /** diff --git a/src/consensus/strata_estimator.c b/src/consensus/strata_estimator.c new file mode 100644 index 000000000..685c50f0f --- /dev/null +++ b/src/consensus/strata_estimator.c @@ -0,0 +1,145 @@ +/* + This file is part of GNUnet + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file consensus/ibf.h + * @brief invertible bloom filter + * @author Florian Dold + */ + +#include "platform.h" +#include "gnunet_common.h" +#include "ibf.h" +#include "strata_estimator.h" + +void +strata_estimator_write (const struct StrataEstimator *se, void *buf) +{ + int i; + for (i = 0; i < se->strata_count; i++) + { + ibf_write_slice (se->strata[i], 0, se->ibf_size, buf); + buf += se->ibf_size * IBF_BUCKET_SIZE; + } +} + +void +strata_estimator_read (const void *buf, struct StrataEstimator *se) +{ + int i; + for (i = 0; i < se->strata_count; i++) + { + ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); + buf += se->ibf_size * IBF_BUCKET_SIZE; + } +} + + +void +strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) +{ + uint32_t v; + int i; + v = key->bits[0]; + /* count trailing '1'-bits of v */ + for (i = 0; v & 1; v>>=1, i++) + /* empty */; + ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); +} + + + +struct StrataEstimator * +strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum) +{ + struct StrataEstimator *se; + int i; + + /* fixme: allocate everything in one chunk */ + + se = GNUNET_malloc (sizeof (struct StrataEstimator)); + se->strata_count = strata_count; + se->ibf_size = ibf_size; + se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter *) * strata_count); + for (i = 0; i < strata_count; i++) + se->strata[i] = ibf_create (ibf_size, ibf_hashnum); + return se; +} + + +/** + * Estimate set difference with two strata estimators, + * i.e. arrays of IBFs. + * Does not not modify its arguments. + * + * @param se1 first strata estimator + * @param se2 second strata estimator + * @return the estimated difference + */ +unsigned int +strata_estimator_difference (const struct StrataEstimator *se1, + const struct StrataEstimator *se2) +{ + int i; + int count; + + GNUNET_assert (se1->strata_count == se2->strata_count); + count = 0; + for (i = se1->strata_count - 1; i >= 0; i--) + { + struct InvertibleBloomFilter *diff; + /* number of keys decoded from the ibf */ + int ibf_count; + int more; + ibf_count = 0; + /* FIXME: implement this without always allocating new IBFs */ + diff = ibf_dup (se1->strata[i]); + ibf_subtract (diff, se2->strata[i]); + for (;;) + { + more = ibf_decode (diff, NULL, NULL); + if (GNUNET_NO == more) + { + count += ibf_count; + break; + } + if (GNUNET_SYSERR == more) + { + ibf_destroy (diff); + return count * (1 << (i + 1)); + } + ibf_count++; + } + ibf_destroy (diff); + } + return count; +} + + +void +strata_estimator_destroy (struct StrataEstimator *se) +{ + int i; + for (i = 0; i < se->strata_count; i++) + ibf_destroy (se->strata[i]); + GNUNET_free (se->strata); + GNUNET_free (se); +} + diff --git a/src/consensus/strata_estimator.h b/src/consensus/strata_estimator.h new file mode 100644 index 000000000..cb5bd3d0a --- /dev/null +++ b/src/consensus/strata_estimator.h @@ -0,0 +1,84 @@ +/* + This file is part of GNUnet + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file consensus/strata_estimator.h + * @brief estimator of set difference + * @author Florian Dold + */ + +#ifndef GNUNET_CONSENSUS_STRATA_ESTIMATOR_H +#define GNUNET_CONSENSUS_STRATA_ESTIMATOR_H + +#include "platform.h" +#include "gnunet_common.h" +#include "gnunet_util_lib.h" + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + + +struct StrataEstimator +{ + struct InvertibleBloomFilter **strata; + unsigned int strata_count; + unsigned int ibf_size; +}; + + +void +strata_estimator_write (const struct StrataEstimator *se, void *buf); + + +void +strata_estimator_read (const void *buf, struct StrataEstimator *se); + + +struct StrataEstimator * +strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum); + + +unsigned int +strata_estimator_difference (const struct StrataEstimator *se1, + const struct StrataEstimator *se2); + + +void +strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key); + + +void +strata_estimator_destroy (struct StrataEstimator *se); + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index 89f109345..6dc37f7d9 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -1247,10 +1247,8 @@ insert_next_element (void *cls, * * @param cls the 'struct DirectNeighbor' we're building the consensus with * @param element the new element we have learned - * @return GNUNET_OK if the valid is well-formed and should be added to the consensus, - * GNUNET_SYSERR if the element should be ignored and not be propagated */ -static int +static void learn_route_cb (void *cls, const struct GNUNET_CONSENSUS_Element *element) { @@ -1274,12 +1272,12 @@ learn_route_cb (void *cls, neighbor->consensus_task = GNUNET_SCHEDULER_add_delayed (GNUNET_DV_CONSENSUS_FREQUENCY, &start_consensus, neighbor); - return GNUNET_SYSERR; + return; } if (sizeof (struct Target) != element->size) { GNUNET_break_op (0); - return GNUNET_SYSERR; + return; } target = GNUNET_malloc (sizeof (struct Target)); memcpy (target, element->data, sizeof (struct Target)); @@ -1291,9 +1289,8 @@ learn_route_cb (void *cls, { GNUNET_break_op (0); GNUNET_free (target); - return GNUNET_SYSERR; + return; } - return GNUNET_OK; } diff --git a/src/include/gnunet_consensus_service.h b/src/include/gnunet_consensus_service.h index 0c74a6a27..f7f784f6e 100644 --- a/src/include/gnunet_consensus_service.h +++ b/src/include/gnunet_consensus_service.h @@ -71,10 +71,8 @@ struct GNUNET_CONSENSUS_Element * * @param cls closure * @param element new element, NULL on error - * @return GNUNET_OK if the valid is well-formed and should be added to the consensus, - * GNUNET_SYSERR if the element should be ignored and not be propagated */ -typedef int (*GNUNET_CONSENSUS_ElementCallback) (void *cls, +typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls, const struct GNUNET_CONSENSUS_Element *element); diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 7179914af..431542660 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1754,6 +1754,11 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN 548 +/** + * Abort a round, don't send requested elements anymore + */ +#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548 + /** * Next available: 570 -- cgit v1.2.3