diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-04-11 10:08:52 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-04-11 10:08:52 +0000 |
commit | 210be82b7cdc6058401e7d5042aa50dd0b750c92 (patch) | |
tree | e2bfa5a87038ef0a7f906d5ede8d6e7ea7f2638b /src | |
parent | 2b406c1533a919057cda8850315af1fca5b48a45 (diff) |
added consensus log-round simulation, work on consensus service, still problems with dv test case
Diffstat (limited to 'src')
-rw-r--r-- | src/consensus/Makefile.am | 6 | ||||
-rw-r--r-- | src/consensus/consensus-simulation.py | 103 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 20 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-ibf.c | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 3 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 2779 | ||||
-rw-r--r-- | src/consensus/ibf.c | 87 | ||||
-rw-r--r-- | src/consensus/ibf.h | 53 | ||||
-rw-r--r-- | src/consensus/strata_estimator.c | 145 | ||||
-rw-r--r-- | src/consensus/strata_estimator.h (renamed from src/consensus/consensus_flout.h) | 46 | ||||
-rw-r--r-- | src/dv/gnunet-service-dv.c | 11 | ||||
-rw-r--r-- | src/include/gnunet_consensus_service.h | 4 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 5 |
13 files changed, 1630 insertions, 1636 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index e469de057..82af29c87 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am @@ -61,7 +61,8 @@ gnunet_consensus_ibf_LDADD = \ gnunet_service_consensus_SOURCES = \ gnunet-service-consensus.c \ - ibf.c + ibf.c \ + strata_estimator.c gnunet_service_consensus_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/core/libgnunetcore.la \ @@ -71,7 +72,8 @@ gnunet_service_consensus_LDADD = \ gnunet_service_evil_consensus_SOURCES = \ gnunet-service-consensus.c \ - ibf.c + ibf.c \ + strata_estimator.c gnunet_service_evil_consensus_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/core/libgnunetcore.la \ diff --git a/src/consensus/consensus-simulation.py b/src/consensus/consensus-simulation.py new file mode 100644 index 000000000..930dfee62 --- /dev/null +++ b/src/consensus/consensus-simulation.py @@ -0,0 +1,103 @@ +#!/usr/bin/python
+# This file is part of GNUnet
+# (C) 2013 Christian Grothoff (and other contributing authors)
+#
+# GNUnet is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation; either version 2, or (at your
+# option) any later version.
+#
+# GNUnet is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNUnet; see the file COPYING. If not, write to the
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+
+import argparse
+import random
+from math import ceil,log,floor
+
+def bsc(n):
+ """ count the bits set in n"""
+ l = n.bit_length()
+ c = 0
+ x = 1
+ for _ in range(0, l):
+ if n & x:
+ c = c + 1
+ x = x << 1
+ return c
+
+def simulate(k, n, verbose):
+ assert k < n
+ largest_arc = int(2**ceil(log(n, 2))) / 2
+ num_ghosts = (2 * largest_arc) - n
+ if verbose:
+ print "we have", num_ghosts, "ghost peers"
+ # n.b. all peers with idx<k are evil
+ peers = range(n)
+ info = [1 << x for x in xrange(n)]
+ def done_p():
+ for x in xrange(k, n):
+ if bsc(info[x]) < n-k:
+ return False
+ return True
+ rounds = 0
+ while not done_p():
+ if verbose:
+ print "-- round --"
+ arc = 1
+ while arc <= largest_arc:
+ if verbose:
+ print "-- subround --"
+ new_info = [x for x in info]
+ for peer_physical in xrange(n):
+ peer_logical = peers[peer_physical]
+ peer_type = None
+ partner_logical = (peer_logical + arc) % n
+ partner_physical = peers.index(partner_logical)
+ if peer_physical < k or partner_physical < k:
+ if verbose:
+ print "bad peer in connection", peer_physical, "--", partner_physical
+ continue
+ if peer_logical & arc == 0:
+ # we are outgoing
+ if verbose:
+ print peer_physical, "connects to", partner_physical
+ peer_type = "outgoing"
+ if peer_logical < num_ghosts:
+ # we have a ghost, check if the peer who connects
+ # to our ghost is actually outgoing
+ ghost_partner_logical = (peer_logical - arc) % n
+ if ghost_partner_logical & arc == 0:
+ peer_type = peer_type + ", ghost incoming"
+ new_info[peer_physical] = new_info[peer_physical] | info[peer_physical] | info[partner_physical]
+ new_info[partner_physical] = new_info[partner_physical] | info[peer_physical] | info[partner_physical]
+ else:
+ peer_type = "incoming"
+ if verbose > 1:
+ print "type of", str(peer_physical) + ":", peer_type
+ info = new_info
+ arc = arc << 1;
+ rounds = rounds + 1
+ random.shuffle(peers)
+ return rounds
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("k", metavar="k", type=int, help="#(bad peers)")
+ parser.add_argument("n", metavar="n", type=int, help="#(all peers)")
+ parser.add_argument("r", metavar="r", type=int, help="#(rounds)")
+ parser.add_argument('--verbose', '-v', action='count')
+
+ args = parser.parse_args()
+ sum = 0.0;
+ for n in xrange (0, args.r):
+ sum += simulate(args.k, args.n, args.verbose)
+ print sum / args.r;
+
+
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 25ace3a4d..fd61d3712 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -231,15 +231,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) } } -static void -queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) -{ - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); -} - /** * Called when the server has sent is a new element @@ -252,8 +243,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_CONSENSUS_ElementMessage *msg) { struct GNUNET_CONSENSUS_Element element; - struct GNUNET_CONSENSUS_AckMessage *ack_msg; - int ret; LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); @@ -261,14 +250,7 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); element.data = &msg[1]; - ret = consensus->new_element_cb (consensus->new_element_cls, &element); - - ack_msg = GNUNET_new (struct GNUNET_CONSENSUS_AckMessage); - ack_msg->header.size = htons (sizeof *ack_msg); - ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); - ack_msg->keep = ret; - - queue_message (consensus, &ack_msg->header); + consensus->new_element_cb (consensus->new_element_cls, &element); send_next (consensus); } diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c index 73dc31b56..d431795f1 100644 --- a/src/consensus/gnunet-consensus-ibf.c +++ b/src/consensus/gnunet-consensus-ibf.c @@ -160,8 +160,8 @@ run (void *cls, char *const *args, const char *cfgfile, i++; } - ibf_a = ibf_create (ibf_size, hash_num, 0); - ibf_b = ibf_create (ibf_size, hash_num, 0); + ibf_a = ibf_create (ibf_size, hash_num); + ibf_b = ibf_create (ibf_size, hash_num); printf ("generated sets\n"); diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index 9e9b89446..d8c1b14ee 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c @@ -192,12 +192,11 @@ connect_complete (void *cls, } -static int +static void new_element_cb (void *cls, const struct GNUNET_CONSENSUS_Element *element) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); - return GNUNET_YES; } diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index ebd2d238b..179df0fb0 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -32,23 +32,32 @@ #include "gnunet_consensus_service.h" #include "gnunet_core_service.h" #include "gnunet_stream_lib.h" + #include "consensus_protocol.h" -#include "ibf.h" #include "consensus.h" +#include "ibf.h" +#include "strata_estimator.h" + + +/* + * Log macro that prefixes the local peer and the peer we are in contact with. + */ +#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \ + cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__) /** * Number of IBFs in a strata estimator. */ -#define STRATA_COUNT 32 +#define SE_STRATA_COUNT 32 /** - * Number of buckets per IBF. + * Size of the IBFs in the strata estimator. */ -#define STRATA_IBF_BUCKETS 80 +#define SE_IBF_SIZE 80 /** * hash num parameter for the difference digests and strata estimators */ -#define STRATA_HASH_NUM 3 +#define SE_IBF_HASH_NUM 3 /** * Number of buckets that can be transmitted in one message. @@ -63,72 +72,55 @@ #define MAX_IBF_ORDER (16) /** - * Number exp-rounds. + * Number of exponential rounds, used in the inventory and completion round. */ #define NUM_EXP_ROUNDS (4) /* forward declarations */ -struct ConsensusSession; -struct IncomingSocket; +/* mutual recursion with struct ConsensusSession */ struct ConsensusPeerInformation; -static void -client_send_next (struct ConsensusSession *session); - -static int -get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); - -static void -round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +struct MessageQueue; +/* mutual recursion with round_over */ static void -send_ibf (struct ConsensusPeerInformation *cpi); +subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +/* mutial recursion with transmit_queued */ static void -send_strata_estimator (struct ConsensusPeerInformation *cpi); +client_send_next (struct MessageQueue *mq); +/* mutual recursion with mst_session_callback */ static void -decode (struct ConsensusPeerInformation *cpi); - -static void -write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size); +open_cb (void *cls, struct GNUNET_STREAM_Socket *socket); -static void -subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +static int +mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message); /** - * An element that is waiting to be transmitted to the client. + * Additional information about a consensus element. */ -struct PendingElement +struct ElementInfo { /** - * Pending elements are kept in a DLL. + * The element itself. */ - struct PendingElement *next; - + struct GNUNET_CONSENSUS_Element *element; /** - * Pending elements are kept in a DLL. + * Hash of the element */ - struct PendingElement *prev; - + struct GNUNET_HashCode *element_hash; /** - * The actual element + * Number of other peers that have the element in the inventory. */ - struct GNUNET_CONSENSUS_Element *element; - - /* peer this element is coming from */ - struct ConsensusPeerInformation *cpi; -}; - - -struct ElementList -{ - struct ElementList *next; - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_HashCode *element_hash; + unsigned int inventory_count; + /** + * Bitmap of peers that have this element in their inventory + */ + uint8_t *inventory_bitmap; }; @@ -147,178 +139,93 @@ enum ConsensusRound CONSENSUS_ROUND_EXCHANGE, /** * Exchange which elements each peer has, but not the elements. + * This round uses the all-to-all scheme. */ CONSENSUS_ROUND_INVENTORY, /** - * Collect and distribute missing values. + * Collect and distribute missing values with the exponential scheme. */ - CONSENSUS_ROUND_STOCK, + CONSENSUS_ROUND_COMPLETION, /** - * Consensus concluded. + * Consensus concluded. After timeout and finished communication with client, + * consensus session will be destroyed. */ CONSENSUS_ROUND_FINISH }; +/* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */ /** - * Information about a peer that is in a consensus session. + * State of another peer with respect to the + * current ibf. */ -struct ConsensusPeerInformation -{ - struct GNUNET_PeerIdentity peer_id; - - /** - * Socket for communicating with the peer, either created by the local peer, - * or the remote peer. - */ - struct GNUNET_STREAM_Socket *socket; - - /** - * Message tokenizer, for the data received from this peer via the stream socket. - */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; - - /** - * Do we connect to the peer, or does the peer connect to us? - * Only valid for all-to-all phases - */ - int is_outgoing; - - /** - * Did we receive/send a consensus hello? - */ - int hello; - +enum ConsensusIBFState { /** - * Handle for currently active read + * There is nothing going on with the IBF. */ - struct GNUNET_STREAM_ReadHandle *rh; - + IBF_STATE_NONE=0, /** - * Handle for currently active read + * We currently receive an ibf. */ - struct GNUNET_STREAM_WriteHandle *wh; - - enum { - /* beginning of round */ - IBF_STATE_NONE=0, - /* we currently receive an ibf */ - IBF_STATE_RECEIVING, - /* we currently transmit an ibf */ - IBF_STATE_TRANSMITTING, - /* we decode a received ibf */ - IBF_STATE_DECODING, - /* wait for elements and element requests */ - IBF_STATE_ANTICIPATE_DIFF - } ibf_state ; - - /** - * What is the order (=log2 size) of the ibf - * we're currently dealing with? - * Interpretation depends on ibf_state. - */ - int ibf_order; - - /** - * The current IBF for this peer, - * purpose dependent on ibf_state - */ - struct InvertibleBloomFilter *ibf; - - /** - * How many buckets have we transmitted/received? - * Interpretatin depends on ibf_state - */ - int ibf_bucket_counter; - - /** - * Strata estimator of the peer, NULL if our peer - * initiated the reconciliation. - */ - struct StrataEstimator *se; - - /** - * Element keys that this peer misses, but we have them. - */ - struct GNUNET_CONTAINER_MultiHashMap *requested_keys; - - /** - * Element keys that this peer has, but we miss. - */ - struct GNUNET_CONTAINER_MultiHashMap *reported_keys; - - /** - * Back-reference to the consensus session, - * to that ConsensusPeerInformation can be used as a closure - */ - struct ConsensusSession *session; - + IBF_STATE_RECEIVING, + /* + * we decode a received ibf + */ + IBF_STATE_DECODING, /** - * Messages queued for the current round. + * wait for elements and element requests */ - struct QueuedMessage *messages_head; + IBF_STATE_ANTICIPATE_DIFF +}; - /** - * Messages queued for the current round. - */ - struct QueuedMessage *messages_tail; - /** - * True if we are actually replaying the strata message, - * e.g. currently handling the premature_strata_message. - */ - int replaying_strata_message; +typedef void (*AddCallback) (struct MessageQueue *mq); +typedef void (*MessageSentCallback) (void *cls); - /** - * A strata message that is not actually for the current round, - * used in the exp-scheme. - */ - struct StrataMessage *premature_strata_message; - /** - * We have finishes the exp-subround with the peer. - */ - int exp_subround_finished; +/** + * Collection of the state necessary to read and write gnunet messages + * to a stream socket. Should be used as closure for stream_data_processor. + */ +struct MessageStreamState +{ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + struct MessageQueue *mq; + void *mst_cls; + struct GNUNET_STREAM_Socket *socket; + struct GNUNET_STREAM_ReadHandle *rh; + struct GNUNET_STREAM_WriteHandle *wh; +}; - int inventory_synced; - - /** - * Round this peer seems to be in, according to the last SE we got. - * Necessary to store this, as we sometimes need to respond to a request from an - * older round, while we are already in the next round. - */ - enum ConsensusRound apparent_round; +struct ServerClientSocketState +{ + struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVER_TransmitHandle* th; }; -typedef void (*QueuedMessageCallback) (void *msg); /** - * A doubly linked list of messages. + * Generic message queue, for queueing outgoing messages. */ -struct QueuedMessage +struct MessageQueue { - struct GNUNET_MessageHeader *msg; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *next; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *prev; - - QueuedMessageCallback cb; - - void *cls; + void *state; + AddCallback add_cb; + struct PendingMessage *pending_head; + struct PendingMessage *pending_tail; + struct PendingMessage *current_pm; }; -struct StrataEstimator +struct PendingMessage { - struct InvertibleBloomFilter **strata; + struct GNUNET_MessageHeader *msg; + struct MessageQueue *parent_queue; + struct PendingMessage *next; + struct PendingMessage *prev; + MessageSentCallback sent_cb; + void *sent_cb_cls; }; @@ -351,38 +258,26 @@ struct ConsensusSession struct GNUNET_HashCode global_id; /** - * Local client in this consensus session. - * There is only one client per consensus session. - */ - struct GNUNET_SERVER_Client *client; - - /** - * Elements in the consensus set of this session, - * all of them either have been sent by or approved by the client. - * Contains ElementList. - * Used as a unique-key hashmap. + * The server's client and associated local state */ - struct GNUNET_CONTAINER_MultiHashMap *values; - - /** - * Elements that have not been approved (or rejected) by the client yet. - */ - struct PendingElement *client_approval_head; + struct ServerClientSocketState scss; /** - * Elements that have not been approved (or rejected) by the client yet. + * Queued messages to the client. */ - struct PendingElement *client_approval_tail; + struct MessageQueue *client_mq; /** - * Messages to be sent to the local client that owns this session + * IBF_Key -> 2^(HashCode*) + * FIXME: + * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *. */ - struct QueuedMessage *client_messages_head; + struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map; /** - * Messages to be sent to the local client that owns this session + * Maps HashCodes to ElementInfos */ - struct QueuedMessage *client_messages_tail; + struct GNUNET_CONTAINER_MultiHashMap *values; /** * Currently active transmit handle for sending to the client @@ -412,9 +307,14 @@ struct ConsensusSession struct ConsensusPeerInformation *info; /** + * GNUNET_YES if the client has called conclude. + * */ + int conclude; + + /** * Index of the local peer in the peers array */ - int local_peer_idx; + unsigned int local_peer_idx; /** * Strata estimator, computed online @@ -431,16 +331,16 @@ struct ConsensusSession */ enum ConsensusRound current_round; - int exp_round; - - int exp_subround; - /** * Permutation of peers for the current round, * maps logical index (for current round) to physical index (location in info array) */ int *shuffle; + int exp_round; + + int exp_subround; + /** * The partner for the current exp-round */ @@ -454,41 +354,121 @@ struct ConsensusSession /** - * Sockets from other peers who want to communicate with us. - * It may not be known yet which consensus session they belong to. - * Also, the session might not exist yet locally. + * Information about a peer that is in a consensus session. */ -struct IncomingSocket +struct ConsensusPeerInformation { /** - * Incoming sockets are kept in a double linked list. + * Peer identitty of the peer in the consensus session */ - struct IncomingSocket *next; + struct GNUNET_PeerIdentity peer_id; /** - * Incoming sockets are kept in a double linked list. + * Do we connect to the peer, or does the peer connect to us? + * Only valid for all-to-all phases */ - struct IncomingSocket *prev; + int is_outgoing; /** - * The actual socket. + * Did we receive/send a consensus hello? */ - struct GNUNET_STREAM_Socket *socket; + int hello; + + /* + * FIXME + */ + struct MessageStreamState mss; /** - * Handle for currently active read + * Current state */ - struct GNUNET_STREAM_ReadHandle *rh; + enum ConsensusIBFState ibf_state; /** - * Peer that connected to us with the socket. + * What is the order (=log2 size) of the ibf + * we're currently dealing with? + * Interpretation depends on ibf_state. */ - struct GNUNET_PeerIdentity peer_id; + int ibf_order; + + /** + * The current IBF for this peer, + * purpose dependent on ibf_state + */ + struct InvertibleBloomFilter *ibf; /** - * Message stream tokenizer for this socket. + * How many buckets have we transmitted/received? + * Interpretatin depends on ibf_state */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; + int ibf_bucket_counter; + + /** + * Strata estimator of the peer, NULL if our peer + * initiated the reconciliation. + */ + struct StrataEstimator *se; + + /** + * Back-reference to the consensus session, + * to that ConsensusPeerInformation can be used as a closure + */ + struct ConsensusSession *session; + + /** + * True if we are actually replaying the strata message, + * e.g. currently handling the premature_strata_message. + */ + int replaying_strata_message; + + /** + * A strata message that is not actually for the current round, + * used in the exp-scheme. + */ + struct StrataMessage *premature_strata_message; + + /** + * We have finishes the exp-subround with the peer. + */ + int exp_subround_finished; + + /** + * GNUNET_YES if we synced inventory with this peer; + * GNUNET_NO otherwise. + */ + int inventory_synced; + + /** + * Round this peer seems to be in, according to the last SE we got. + * Necessary to store this, as we sometimes need to respond to a request from an + * older round, while we are already in the next round. + */ + enum ConsensusRound apparent_round; +}; + + +/** + * Sockets from other peers who want to communicate with us. + * It may not be known yet which consensus session they belong to, we have to wait for the + * peer's hello. + * Also, the session might not exist yet locally, we have to wait for a local client to connect. + */ +struct IncomingSocket +{ + /** + * Incoming sockets are kept in a double linked list. + */ + struct IncomingSocket *next; + + /** + * Incoming sockets are kept in a double linked list. + */ + struct IncomingSocket *prev; + + /** + * Peer that connected to us with the socket. + */ + struct GNUNET_PeerIdentity peer_id; /** * Peer-in-session this socket belongs to, once known, otherwise NULL. @@ -500,19 +480,35 @@ struct IncomingSocket * but the session does not exist yet. */ struct GNUNET_HashCode *requested_gid; + + /* + * Timeout, will disconnect the socket if not yet in a session. + * FIXME: implement + */ + GNUNET_SCHEDULER_TaskIdentifier timeout; + + /* FIXME */ + struct MessageStreamState mss; }; +/** + * Linked list of incoming sockets. + */ static struct IncomingSocket *incoming_sockets_head; + +/** + * Linked list of incoming sockets. + */ static struct IncomingSocket *incoming_sockets_tail; /** - * Linked list of sesstions this peer participates in. + * Linked list of sessions this peer participates in. */ static struct ConsensusSession *sessions_head; /** - * Linked list of sesstions this peer participates in. + * Linked list of sessions this peer participates in. */ static struct ConsensusSession *sessions_tail; @@ -543,151 +539,159 @@ static struct GNUNET_STREAM_ListenSocket *listener; /** - * Queue a message to be sent to the inhabiting client of a session. + * Transmit a queued message to the session's client. * - * @param session session - * @param msg message we want to queue + * @param cls consensus session + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf */ -static void -queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) +static size_t +transmit_queued (void *cls, size_t size, + void *buf) { - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); + struct MessageQueue *mq = cls; + struct PendingMessage *pm = mq->pending_head; + struct ServerClientSocketState *state = mq->state; + size_t msg_size; + + GNUNET_assert (NULL != pm); + GNUNET_assert (NULL != buf); + msg_size = ntohs (pm->msg->size); + GNUNET_assert (size >= msg_size); + memcpy (buf, pm->msg, msg_size); + GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); + state->th = NULL; + client_send_next (cls); + GNUNET_free (pm); + return msg_size; } -/** - * Queue a message to be sent to another peer - * - * @param cpi peer - * @param msg message we want to queue - * @param cb callback, called when the message is given to strem - * @param cls closure for cb - */ + static void -queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) -{ - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - qm->cls = cls; - qm->cb = cb; - GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm); - if (cpi->wh == NULL) - write_queued (cpi, GNUNET_STREAM_OK, 0); +client_send_next (struct MessageQueue *mq) +{ + struct ServerClientSocketState *state = mq->state; + int msize; + + GNUNET_assert (NULL != state); + + if ( (NULL != state->th) || + (NULL == mq->pending_head) ) + return; + msize = ntohs (mq->pending_head->msg->size); + state->th = + GNUNET_SERVER_notify_transmit_ready (state->client, msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_queued, mq); +} + + +struct MessageQueue * +create_message_queue_for_server_client (struct ServerClientSocketState *scss) +{ + struct MessageQueue *mq; + mq = GNUNET_new (struct MessageQueue); + mq->add_cb = client_send_next; + mq->state = scss; + return mq; } /** - * Queue a message to be sent to another peer + * Functions of this signature are called whenever writing operations + * on a stream are executed * - * @param cpi peer - * @param msg message we want to queue + * @param cls the closure from GNUNET_STREAM_write + * @param status the status of the stream at the time this function is called; + * GNUNET_STREAM_OK if writing to stream was completed successfully; + * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully + * (this doesn't mean that the data is never sent, the receiver may + * have read the data but its ACKs may have been lost); + * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the + * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot + * be processed. + * @param size the number of bytes written */ -static void -queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg) +static void +write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - queue_peer_message_with_cls (cpi, msg, NULL, NULL); + struct MessageQueue *mq = cls; + struct MessageStreamState *mss = mq->state; + struct PendingMessage *pm; + + GNUNET_assert (GNUNET_STREAM_OK == status); + + /* call cb for message we finished sending */ + pm = mq->current_pm; + if (NULL != pm) + { + if (NULL != pm->sent_cb) + pm->sent_cb (pm->sent_cb_cls); + GNUNET_free (pm); + } + + mss->wh = NULL; + + pm = mq->pending_head; + mq->current_pm = pm; + if (NULL == pm) + return; + GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); + mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls); + GNUNET_assert (NULL != mss->wh); } -/* static void -clear_peer_messages (struct ConsensusPeerInformation *cpi) +stream_socket_add_cb (struct MessageQueue *mq) { - cpi->messages_head = NULL; - cpi->messages_tail = NULL; + if (NULL != mq->current_pm) + return; + write_queued (mq, GNUNET_STREAM_OK, 0); } -*/ -/** - * Estimate set difference with two strata estimators, - * i.e. arrays of IBFs. - * Does not not modify its arguments. - * - * @param se1 first strata estimator - * @param se2 second strata estimator - * @return the estimated difference - */ -static int -estimate_difference (const struct StrataEstimator *se1, - const struct StrataEstimator *se2) +struct MessageQueue * +create_message_queue_for_stream_socket (struct MessageStreamState *mss) { - int i; - int count; - count = 0; - for (i = STRATA_COUNT - 1; i >= 0; i--) - { - struct InvertibleBloomFilter *diff; - /* number of keys decoded from the ibf */ - int ibf_count; - int more; - ibf_count = 0; - /* FIXME: implement this without always allocating new IBFs */ - diff = ibf_dup (se1->strata[i]); - ibf_subtract (diff, se2->strata[i]); - for (;;) - { - more = ibf_decode (diff, NULL, NULL); - if (GNUNET_NO == more) - { - count += ibf_count; - break; - } - if (GNUNET_SYSERR == more) - { - ibf_destroy (diff); - return count * (1 << (i + 1)); - } - ibf_count++; - } - ibf_destroy (diff); - } - return count; + struct MessageQueue *mq; + mq = GNUNET_new (struct MessageQueue); + mq->state = mss; + mq->add_cb = stream_socket_add_cb; + return mq; +} + + +struct PendingMessage * +new_pending_message (uint16_t size, uint16_t type) +{ + struct PendingMessage *pm; + pm = GNUNET_malloc (sizeof *pm + size); + pm->msg = (void *) &pm[1]; + pm->msg->size = htons (size); + pm->msg->type = htons (type); + return pm; } /** - * Called when receiving data from a peer that is member of - * an inhabited consensus session. + * Queue a message in a message queue. * - * @param cls the closure from GNUNET_STREAM_read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read; will be 0 on timeout - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). + * @param queue the message queue + * @param pending message, message with additional information */ -static size_t -session_stream_data_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) +void +message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg) { - struct ConsensusPeerInformation *cpi; - int ret; - - GNUNET_assert (GNUNET_STREAM_OK == status); - cpi = cls; - GNUNET_assert (NULL != cpi->mst); - ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); - if (GNUNET_SYSERR == ret) - { - /* FIXME: handle this correctly */ - GNUNET_assert (0); - } - /* read again */ - cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, - &session_stream_data_processor, cpi); - /* we always read all data */ - return size; + GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg); + queue->add_cb (queue); } + /** - * Called when we receive data from a peer that is not member of - * a session yet, or the session is not yet inhabited. + * Called when we receive data from a peer via stream. * * @param cls the closure from GNUNET_STREAM_read * @param status the status of the stream at the time this function is called @@ -697,62 +701,66 @@ session_stream_data_processor (void *cls, * given to the next time the read processor is called). */ static size_t -incoming_stream_data_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) +stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size) { - struct IncomingSocket *incoming; + struct MessageStreamState *mss = cls; int ret; - GNUNET_assert (GNUNET_STREAM_OK == status); - incoming = cls; - ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); + mss->rh = NULL; + + if (GNUNET_STREAM_OK != status) + { + /* FIXME: handle this correctly */ + GNUNET_break (0); + return 0; + } + GNUNET_assert (NULL != mss->mst); + ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES); if (GNUNET_SYSERR == ret) { /* FIXME: handle this correctly */ - GNUNET_assert (0); + GNUNET_break (0); + return 0; } /* read again */ - incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, - &incoming_stream_data_processor, incoming); + mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss); /* we always read all data */ return size; } +/** + * Send element or element report to the peer specified in cpi. + * + * @param cpi peer to send the elements to + * @param head head of the element list + */ static void -send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head) +send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e) { - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_MessageHeader *element_msg; - size_t msize; + struct PendingMessage *pm; - while (NULL != head) + switch (cpi->apparent_round) { - element = head->element; - msize = sizeof (struct GNUNET_MessageHeader) + element->size; - element_msg = GNUNET_malloc (msize); - element_msg->size = htons (msize); - switch (cpi->apparent_round) - { - case CONSENSUS_ROUND_STOCK: - case CONSENSUS_ROUND_EXCHANGE: - element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); - break; - case CONSENSUS_ROUND_INVENTORY: - element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); - break; - default: - GNUNET_break (0); - } - GNUNET_assert (NULL != element->data); - memcpy (&element_msg[1], element->data, element->size); - queue_peer_message (cpi, element_msg); - head = head->next; + case CONSENSUS_ROUND_COMPLETION: + case CONSENSUS_ROUND_EXCHANGE: + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); + memcpy (&pm->msg[1], e->element->data, e->element->size); + message_queue_add (cpi->mss.mq, pm); + break; + case CONSENSUS_ROUND_INVENTORY: + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); + memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode)); + message_queue_add (cpi->mss.mq, pm); + break; + default: + GNUNET_break (0); } } + /** * Iterator to insert values into an ibf. * @@ -768,12 +776,10 @@ ibf_values_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) { - struct ConsensusPeerInformation *cpi; - struct ElementList *head; - struct IBF_Key ibf_key; - cpi = cls; - head = value; - ibf_key = ibf_key_from_hashcode (head->element_hash); + struct ConsensusPeerInformation *cpi = cls; + struct ElementInfo *e = value; + struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash); + GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); return GNUNET_YES; @@ -788,11 +794,10 @@ ibf_values_iterator (void *cls, static void prepare_ibf (struct ConsensusPeerInformation *cpi) { - if (NULL == cpi->session->ibfs[cpi->ibf_order]) - { - cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); - GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); - } + if (NULL != cpi->session->ibfs[cpi->ibf_order]) + return; + cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); + GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); } @@ -816,15 +821,18 @@ exp_subround_finished (const struct ConsensusSession *session) { int not_finished; not_finished = 0; - if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO)) - not_finished++; - if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO)) - not_finished++; + if ( (NULL != session->partner_outgoing) && + (GNUNET_NO == session->partner_outgoing->exp_subround_finished) ) + not_finished++; + if ( (NULL != session->partner_incoming) && + (GNUNET_NO == session->partner_incoming->exp_subround_finished) ) + not_finished++; if (0 == not_finished) return GNUNET_YES; return GNUNET_NO; } + static int inventory_round_finished (struct ConsensusSession *session) { @@ -840,153 +848,170 @@ inventory_round_finished (struct ConsensusSession *session) } - static void -fin_sent_cb (void *cls) +clear_message_stream_state (struct MessageStreamState *mss) { - struct ConsensusPeerInformation *cpi; - cpi = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); - switch (cpi->session->current_round) + if (NULL != mss->mst) { - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_STOCK: - if (cpi->session->current_round != cpi->apparent_round) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); - break; - } - cpi->exp_subround_finished = GNUNET_YES; - /* the subround is only really over if *both* partners are done */ - if (GNUNET_YES == exp_subround_finished (cpi->session)) - subround_over (cpi->session, NULL); - else - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); - break; - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) - round_over (cpi->session, NULL); - /* FIXME: maybe go to next round */ - break; - default: - GNUNET_break (0); + GNUNET_SERVER_mst_destroy (mss->mst); + mss->mst = NULL; + } + if (NULL != mss->rh) + { + GNUNET_STREAM_read_cancel (mss->rh); + mss->rh = NULL; + } + if (NULL != mss->wh) + { + GNUNET_STREAM_write_cancel (mss->wh); + mss->wh = NULL; + } + if (NULL != mss->socket) + { + GNUNET_STREAM_close (mss->socket); + mss->socket = NULL; + } + if (NULL != mss->mq) + { + GNUNET_free (mss->mq); + mss->mq = NULL; } } /** - * Gets called when the other peer wants us to inform that - * it has decoded our ibf and sent us all elements / requests + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. */ static int -handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) +destroy_element_info_iter (void *cls, + const struct GNUNET_HashCode * key, + void *value) { - struct ConsensusRoundMessage *fin_msg; - - switch (cpi->session->current_round) - { - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - case CONSENSUS_ROUND_STOCK: - case CONSENSUS_ROUND_EXCHANGE: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - fin_msg = GNUNET_malloc (sizeof *fin_msg); - fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); - fin_msg->header.size = htons (sizeof *fin_msg); - fin_msg->round = cpi->apparent_round; - /* the subround os over once we kicked off sending the fin msg */ - /* FIXME: assert we are talking to the right peer! */ - queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi); - /* FIXME: mark peer as synced */ - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); - break; - } + struct ElementInfo *ei = value; + GNUNET_free (ei->element); + GNUNET_free (ei->element_hash); + GNUNET_free (ei); return GNUNET_YES; } /** - * The other peer wants us to inform that he sent us all the elements we requested. + * Destroy a session, free all resources associated with it. + * + * @param session the session to destroy */ -static int -handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) +static void +destroy_session (struct ConsensusSession *session) { - struct ConsensusRoundMessage *round_msg; - round_msg = (struct ConsensusRoundMessage *) msg; - /* FIXME: only call subround_over if round is the current one! */ - switch (cpi->session->current_round) + int i; + + GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); + GNUNET_SERVER_client_drop (session->scss.client); + session->scss.client = NULL; + if (NULL != session->client_mq) { - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_STOCK: - if (cpi->session->current_round != round_msg->round) + GNUNET_free (session->client_mq); + session->client_mq = NULL; + } + if (NULL != session->shuffle) + { + GNUNET_free (session->shuffle); + session->shuffle = NULL; + } + if (NULL != session->se) + { + strata_estimator_destroy (session->se); + session->se = NULL; + } + if (NULL != session->info) + { + for (i = 0; i < session->num_peers; i++) + { + struct ConsensusPeerInformation *cpi; + cpi = &session->info[i]; + clear_message_stream_state (&cpi->mss); + if (NULL != cpi->se) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - cpi->ibf_state = IBF_STATE_NONE; - cpi->ibf_bucket_counter = 0; - break; + strata_estimator_destroy (cpi->se); + cpi->se = NULL; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - cpi->exp_subround_finished = GNUNET_YES; - if (GNUNET_YES == exp_subround_finished (cpi->session)) - subround_over (cpi->session, NULL); - else - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); - break; - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - if (inventory_round_finished (cpi->session)) - round_over (cpi->session, NULL); - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); - break; + if (NULL != cpi->ibf) + { + ibf_destroy (cpi->ibf); + cpi->ibf = NULL; + } + } + GNUNET_free (session->info); + session->info = NULL; + } + if (NULL != session->ibfs) + { + for (i = 0; i <= MAX_IBF_ORDER; i++) + { + if (NULL != session->ibfs[i]) + { + ibf_destroy (session->ibfs[i]); + session->ibfs[i] = NULL; + } + } + GNUNET_free (session->ibfs); + session->ibfs = NULL; + } + if (NULL != session->values) + { + GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL); + GNUNET_CONTAINER_multihashmap_destroy (session->values); + session->values = NULL; } - return GNUNET_YES; -} - - -static struct StrataEstimator * -strata_estimator_create () -{ - struct StrataEstimator *se; - int i; - - /* fixme: allocate everything in one chunk */ - - se = GNUNET_malloc (sizeof (struct StrataEstimator)); - se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT); - for (i = 0; i < STRATA_COUNT; i++) - se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); - return se; + if (NULL != session->ibf_key_map) + { + GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map); + session->ibf_key_map = NULL; + } + GNUNET_free (session); } + static void -strata_estimator_destroy (struct StrataEstimator *se) +send_client_conclude_done (struct ConsensusSession *session) { - int i; - for (i = 0; i < STRATA_COUNT; i++) - ibf_destroy (se->strata[i]); - GNUNET_free (se->strata); - GNUNET_free (se); + struct PendingMessage *pm; + + /* check if client is even there anymore */ + if (NULL == session->scss.client) + return; + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader), + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); + message_queue_add (session->client_mq, pm); } +/** + * Check if a strata message is for the current round or not + * + * @param session session we are in + * @param strata_msg the strata message to check + * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise + */ static int is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) { switch (strata_msg->round) { - case CONSENSUS_ROUND_STOCK: + case CONSENSUS_ROUND_COMPLETION: case CONSENSUS_ROUND_EXCHANGE: /* here, we also have to compare subrounds */ if ( (strata_msg->round != session->current_round) || (strata_msg->exp_round != session->exp_round) || - (strata_msg->exp_subround != session->exp_subround)) + (strata_msg->exp_subround != session->exp_subround) ) return GNUNET_YES; break; default: @@ -999,6 +1024,72 @@ is_premature_strata_message (const struct ConsensusSession *session, const struc /** + * Send a strata estimator. + * + * @param cpi the peer + */ +static void +send_strata_estimator (struct ConsensusPeerInformation *cpi) +{ + struct PendingMessage *pm; + struct StrataMessage *strata_msg; + + /* FIXME: why is this correct? */ + cpi->apparent_round = cpi->session->current_round; + cpi->ibf_state = IBF_STATE_NONE; + cpi->ibf_bucket_counter = 0; + + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round); + + pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE), + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); + strata_msg = (struct StrataMessage *) pm->msg; + strata_msg->round = cpi->session->current_round; + strata_msg->exp_round = cpi->session->exp_round; + strata_msg->exp_subround = cpi->session->exp_subround; + strata_estimator_write (cpi->session->se, &strata_msg[1]); + message_queue_add (cpi->mss.mq, pm); +} + + +/** + * Send an IBF of the order specified in cpi. + * + * @param cpi the peer + */ +static void +send_ibf (struct ConsensusPeerInformation *cpi) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", + cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + + cpi->ibf_bucket_counter = 0; + while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) + { + unsigned int num_buckets; + struct PendingMessage *pm; + struct DifferenceDigest *digest; + + num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; + /* limit to maximum */ + if (num_buckets > BUCKETS_PER_MESSAGE) + num_buckets = BUCKETS_PER_MESSAGE; + + pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE), + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); + digest = (struct DifferenceDigest *) pm->msg; + digest->order = cpi->ibf_order; + digest->round = cpi->apparent_round; + ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]); + cpi->ibf_bucket_counter += num_buckets; + message_queue_add (cpi->mss.mq, pm); + } + cpi->ibf_bucket_counter = 0; + cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; +} + + +/** * Called when a peer sends us its strata estimator. * In response, we sent out IBF of appropriate size back. * @@ -1008,12 +1099,10 @@ is_premature_strata_message (const struct ConsensusSession *session, const struc static int handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) { - int i; // unsigned? unsigned int diff; - void *buf; - size_t size; - if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY)) + if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) && + (strata_msg->round == CONSENSUS_ROUND_INVENTORY) ) { /* we still have to handle this request appropriately */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", @@ -1023,28 +1112,26 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess { if (GNUNET_NO == cpi->replaying_strata_message) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround); - cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n", + strata_msg->exp_round, strata_msg->exp_subround); + cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header); } return GNUNET_YES; } if (NULL == cpi->se) - cpi->se = strata_estimator_create (); + cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); cpi->apparent_round = strata_msg->round; - size = ntohs (strata_msg->header.size); - buf = (void *) &strata_msg[1]; // FIXME: do NOT cast away 'const'! - for (i = 0; i < STRATA_COUNT; i++) + if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) { - int res; - res = ibf_read (&buf, &size, cpi->se->strata[i]); - GNUNET_assert (GNUNET_OK == res); + LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n"); + return GNUNET_NO; } - - diff = estimate_difference (cpi->session->se, cpi->se); + strata_estimator_read (&strata_msg[1], cpi->se); + GNUNET_assert (NULL != cpi->session->se); + diff = strata_estimator_difference (cpi->session->se, cpi->se); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); @@ -1053,10 +1140,10 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess { case CONSENSUS_ROUND_EXCHANGE: case CONSENSUS_ROUND_INVENTORY: - case CONSENSUS_ROUND_STOCK: + case CONSENSUS_ROUND_COMPLETION: /* send IBF of the right size */ cpi->ibf_order = 0; - while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) ) + while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) ) cpi->ibf_order++; if (cpi->ibf_order > MAX_IBF_ORDER) cpi->ibf_order = MAX_IBF_ORDER; @@ -1066,7 +1153,6 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess if (NULL != cpi->ibf) ibf_destroy (cpi->ibf); cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); - cpi->ibf_state = IBF_STATE_TRANSMITTING; cpi->ibf_bucket_counter = 0; send_ibf (cpi); break; @@ -1079,11 +1165,104 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess } + +static int +send_elements_iterator (void *cls, + const struct GNUNET_HashCode * key, + void *value) +{ + struct ConsensusPeerInformation *cpi = cls; + struct ElementInfo *ei; + ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value); + if (NULL == ei) + { + LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n", + GNUNET_h2s((struct GNUNET_HashCode *) value)); + return GNUNET_YES; + } + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n"); + send_element_or_report (cpi, ei); + return GNUNET_YES; +} + + +/** + * Decode the current diff ibf, and send elements/requests/reports/ + * + * @param cpi partner peer + */ +static void +decode (struct ConsensusPeerInformation *cpi) +{ + struct IBF_Key key; + int side; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + + while (1) + { + int res; + + res = ibf_decode (cpi->ibf, &side, &key); + if (GNUNET_SYSERR == res) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); + /* decoding failed, we tell the other peer by sending our ibf with a larger order */ + cpi->ibf_order++; + prepare_ibf (cpi); + cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); + cpi->ibf_bucket_counter = 0; + send_ibf (cpi); + return; + } + if (GNUNET_NO == res) + { + struct PendingMessage *pm; + struct ConsensusRoundMessage *rmsg; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); + + pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); + rmsg = (struct ConsensusRoundMessage *) pm->msg; + rmsg->round = cpi->apparent_round; + message_queue_add (cpi->mss.mq, pm); + return; + } + if (-1 == side) + { + struct GNUNET_HashCode hashcode; + /* we have the element(s), send it to the other peer */ + ibf_hashcode_from_key (key, &hashcode); + GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); + } + else + { + struct PendingMessage *pm; + uint16_t type; + + switch (cpi->apparent_round) + { + case CONSENSUS_ROUND_COMPLETION: + /* FIXME: check if we really want to request the element */ + case CONSENSUS_ROUND_EXCHANGE: + case CONSENSUS_ROUND_INVENTORY: + type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST; + break; + default: + GNUNET_assert (0); + } + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key), + type); + *(struct IBF_Key *) &pm->msg[1] = key; + message_queue_add (cpi->mss.mq, pm); + } + } +} + + static int handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) { int num_buckets; - void *buf; /* FIXME: find out if we're still expecting the same ibf! */ @@ -1128,13 +1307,10 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig return GNUNET_YES; } - if (NULL == cpi->ibf) - cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); - - buf = (void *) &digest[1]; // FIXME: digest is supposed to be READ ONLY! - ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf); + cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); + ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf); cpi->ibf_bucket_counter += num_buckets; if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) @@ -1150,19 +1326,53 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig /** + * Insert an element into the consensus set of the specified session. + * The element will not be copied, and freed when destroying the session. + * + * @param session session for new element + * @param element element to insert + */ +static void +insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) +{ + struct GNUNET_HashCode hash; + struct ElementInfo *e; + struct IBF_Key ibf_key; + int i; + + e = GNUNET_new (struct ElementInfo); + e->element = element; + e->element_hash = GNUNET_new (struct GNUNET_HashCode); + GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash); + ibf_key = ibf_key_from_hashcode (e->element_hash); + ibf_hashcode_from_key (ibf_key, &hash); + strata_estimator_insert (session->se, &hash); + GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + for (i = 0; i <= MAX_IBF_ORDER; i++) + { + if (NULL == session->ibfs[i]) + continue; + ibf_insert (session->ibfs[i], ibf_key); + } +} + + +/** * Handle an element that another peer sent us */ static int handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) { - struct PendingElement *pending_element; struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; size_t size; switch (cpi->session->current_round) { - case CONSENSUS_ROUND_STOCK: + case CONSENSUS_ROUND_COMPLETION: /* FIXME: check if we really expect the element */ case CONSENSUS_ROUND_EXCHANGE: break; @@ -1178,21 +1388,10 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me memcpy (&element[1], &element_msg[1], size); element->data = &element[1]; - pending_element = GNUNET_malloc (sizeof *pending_element); - pending_element->element = element; - GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element); - - client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); - client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); - client_element_msg->header.size = htons (size + sizeof *client_element_msg); - memcpy (&client_element_msg[1], &element[1], size); + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n"); - queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); + insert_element (cpi->session, element); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size); - - client_send_next (cpi->session); - return GNUNET_YES; } @@ -1213,20 +1412,36 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E /* element requests are allowed in every round */ num = ntohs (msg->header.size) / sizeof (struct IBF_Key); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num); ibf_key = (struct IBF_Key *) &msg[1]; while (num--) { - struct ElementList *head; ibf_hashcode_from_key (*ibf_key, &hashcode); - head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); - send_elements (cpi, head); + GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); ibf_key++; } return GNUNET_YES; } +static int +is_peer_connected (struct ConsensusPeerInformation *cpi) +{ + if (NULL == cpi->mss.socket) + return GNUNET_NO; + return GNUNET_YES; +} + + +static void +ensure_peer_connected (struct ConsensusPeerInformation *cpi) +{ + if (NULL != cpi->mss.socket) + return; + cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, + open_cb, cpi, GNUNET_STREAM_OPTION_END); +} + + /** * If necessary, send a message to the peer, depending on the current * round. @@ -1234,17 +1449,22 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E static void embrace_peer (struct ConsensusPeerInformation *cpi) { - GNUNET_assert (GNUNET_YES == cpi->hello); + if (GNUNET_NO == is_peer_connected (cpi)) + { + ensure_peer_connected (cpi); + return; + } + if (GNUNET_NO == cpi->hello) + return; + /* FIXME: correctness of switch */ switch (cpi->session->current_round) { case CONSENSUS_ROUND_EXCHANGE: + case CONSENSUS_ROUND_INVENTORY: if (cpi->session->partner_outgoing != cpi) break; /* fallthrough */ - case CONSENSUS_ROUND_INVENTORY: - /* fallthrough */ - case CONSENSUS_ROUND_STOCK: - if (cpi == cpi->session->partner_outgoing) + case CONSENSUS_ROUND_COMPLETION: send_strata_estimator (cpi); default: break; @@ -1253,195 +1473,313 @@ embrace_peer (struct ConsensusPeerInformation *cpi) /** - * Handle a HELLO-message, send when another peer wants to join a session where - * our peer is a member. The session may or may not be inhabited yet. + * Called when stream has finishes writing the hello message */ -static int -handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) +static void +hello_cont (void *cls) { - /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ - struct ConsensusSession *session; - int idx; + struct ConsensusPeerInformation *cpi = cls; - for (session = sessions_head; NULL != session; session = session->next) - { - if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) - continue; - idx = get_peer_idx (&inc->peer_id, session); - GNUNET_assert (-1 != idx); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); - inc->cpi = &session->info[idx]; - inc->cpi->mst = inc->mst; - inc->cpi->hello = GNUNET_YES; - inc->cpi->socket = inc->socket; - embrace_peer (inc->cpi); - return GNUNET_YES; - } - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); - return GNUNET_NO; + cpi->hello = GNUNET_YES; + embrace_peer (cpi); } /** - * Send a strata estimator. + * Called when we established a stream connection to another peer * - * @param cpi the peer + * @param cls cpi of the peer we just connected to + * @param socket socket to use to communicate with the other side (read/write) */ static void -send_strata_estimator (struct ConsensusPeerInformation *cpi) +open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) { - struct StrataMessage *strata_msg; - void *buf; - size_t msize; - int i; + struct ConsensusPeerInformation *cpi = cls; + struct PendingMessage *pm; + struct ConsensusHello *hello; - cpi->apparent_round = cpi->session->current_round; - cpi->ibf_state = IBF_STATE_NONE; - cpi->ibf_bucket_counter = 0; + GNUNET_assert (NULL == cpi->mss.mst); + GNUNET_assert (NULL == cpi->mss.mq); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n", - cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info)); + cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); + cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); + cpi->mss.mst_cls = cpi; - msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); + pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); + hello = (struct ConsensusHello *) pm->msg; + memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); + pm->sent_cb = hello_cont; + pm->sent_cb_cls = cpi; + message_queue_add (cpi->mss.mq, pm); + cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, + &stream_data_processor, &cpi->mss); +} - strata_msg = GNUNET_malloc (msize); - strata_msg->header.size = htons (msize); - strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); - strata_msg->round = cpi->session->current_round; - strata_msg->exp_round = cpi->session->exp_round; - strata_msg->exp_subround = cpi->session->exp_subround; - - buf = &strata_msg[1]; - for (i = 0; i < STRATA_COUNT; i++) + +static void +replay_premature_message (struct ConsensusPeerInformation *cpi) +{ + if (NULL != cpi->premature_strata_message) { - ibf_write (cpi->session->se->strata[i], &buf, NULL); - } + struct StrataMessage *sm; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); + sm = cpi->premature_strata_message; + cpi->premature_strata_message = NULL; + + cpi->replaying_strata_message = GNUNET_YES; + handle_p2p_strata (cpi, sm); + cpi->replaying_strata_message = GNUNET_NO; - queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg); + GNUNET_free (sm); + } } /** - * Send an IBF of the order specified in cpi. + * Start the inventory round, contact all peers we are supposed to contact. * - * @param cpi the peer + * @param session the current session */ static void -send_ibf (struct ConsensusPeerInformation *cpi) +start_inventory (struct ConsensusSession *session) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + int i; + int last; - cpi->ibf_bucket_counter = 0; - while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) + for (i = 0; i < session->num_peers; i++) { - int num_buckets; - void *buf; - struct DifferenceDigest *digest; - int msize; + session->info[i].ibf_bucket_counter = 0; + session->info[i].ibf_state = IBF_STATE_NONE; + session->info[i].is_outgoing = GNUNET_NO; + } - num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; - /* limit to maximum */ - if (num_buckets > BUCKETS_PER_MESSAGE) - num_buckets = BUCKETS_PER_MESSAGE; + last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; + i = (session->local_peer_idx + 1) % session->num_peers; + while (i != last) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); + session->info[i].is_outgoing = GNUNET_YES; + embrace_peer (&session->info[i]); + i = (i + 1) % session->num_peers; + } + // tie-breaker for even number of peers + if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); + session->info[last].is_outgoing = GNUNET_YES; + embrace_peer (&session->info[last]); + } - msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); + for (i = 0; i < session->num_peers; i++) + { + if (GNUNET_NO == session->info[i].is_outgoing) + replay_premature_message (&session->info[i]); + } +} - digest = GNUNET_malloc (msize); - digest->header.size = htons (msize); - digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); - digest->order = cpi->ibf_order; - digest->round = cpi->apparent_round; - buf = &digest[1]; - ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL); +/** + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int +send_client_elements_iter (void *cls, + const struct GNUNET_HashCode * key, + void *value) +{ + struct ConsensusSession *session = cls; + struct ElementInfo *ei = value; + struct PendingMessage *pm; - queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest); + /* is the client still there? */ + if (NULL == session->scss.client) + return GNUNET_NO; - cpi->ibf_bucket_counter += num_buckets; - } - cpi->ibf_bucket_counter = 0; - cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; + pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); + message_queue_add (session->client_mq, pm); + return GNUNET_YES; } + /** - * Decode the current diff ibf, and send elements/requests/reports/ + * Start the next round. + * This function can be invoked as a timeout task, or called manually (tc will be NULL then). * - * @param cpi partner peer + * @param cls the session + * @param tc task context, for when this task is invoked by the scheduler, + * NULL if invoked for another reason */ -static void -decode (struct ConsensusPeerInformation *cpi) +static void +round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct IBF_Key key; - struct GNUNET_HashCode hashcode; - int side; + struct ConsensusSession *session; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + /* don't kick off next round if we're shutting down */ + if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; - while (1) + session = cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); + + if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) { - int res; + GNUNET_SCHEDULER_cancel (session->round_timeout_tid); + session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; + } - res = ibf_decode (cpi->ibf, &side, &key); - if (GNUNET_SYSERR == res) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); - /* decoding failed, we tell the other peer by sending our ibf with a larger order */ - cpi->ibf_order++; - prepare_ibf (cpi); - cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); - cpi->ibf_state = IBF_STATE_TRANSMITTING; - cpi->ibf_bucket_counter = 0; - send_ibf (cpi); - return; - } - if (GNUNET_NO == res) - { - struct ConsensusRoundMessage *msg; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); - msg = GNUNET_malloc (sizeof *msg); - msg->header.size = htons (sizeof *msg); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); - msg->round = cpi->apparent_round; - queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); - return; - } - if (-1 == side) - { - struct ElementList *head; - /* we have the element(s), send it to the other peer */ - ibf_hashcode_from_key (key, &hashcode); - head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); - send_elements (cpi, head); - } - else - { - struct ElementRequest *msg; - size_t msize; - struct IBF_Key *p; + switch (session->current_round) + { + case CONSENSUS_ROUND_BEGIN: + session->current_round = CONSENSUS_ROUND_EXCHANGE; + session->exp_round = 0; + subround_over (session, NULL); + break; + case CONSENSUS_ROUND_EXCHANGE: + /* handle two peers specially */ + if (session->num_peers <= 2) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); + GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); + send_client_conclude_done (session); + session->current_round = CONSENSUS_ROUND_FINISH; + return; + } + session->current_round = CONSENSUS_ROUND_INVENTORY; + start_inventory (session); + break; + case CONSENSUS_ROUND_INVENTORY: + session->current_round = CONSENSUS_ROUND_COMPLETION; + session->exp_round = 0; + subround_over (session, NULL); + break; + case CONSENSUS_ROUND_COMPLETION: + session->current_round = CONSENSUS_ROUND_FINISH; + send_client_conclude_done (session); + break; + default: + GNUNET_assert (0); + } +} - msize = (sizeof *msg) + sizeof (struct IBF_Key); - msg = GNUNET_malloc (msize); - switch (cpi->apparent_round) + +static void +fin_sent_cb (void *cls) +{ + struct ConsensusPeerInformation *cpi; + cpi = cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); + switch (cpi->session->current_round) + { + case CONSENSUS_ROUND_EXCHANGE: + case CONSENSUS_ROUND_COMPLETION: + if (cpi->session->current_round != cpi->apparent_round) { - case CONSENSUS_ROUND_STOCK: - /* FIXME: check if we really want to request the element */ - case CONSENSUS_ROUND_EXCHANGE: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); - break; - case CONSENSUS_ROUND_INVENTORY: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); - break; - default: - GNUNET_assert (0); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); + break; } - msg->header.size = htons (msize); - p = (struct IBF_Key *) &msg[1]; - *p = key; - queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); - } + cpi->exp_subround_finished = GNUNET_YES; + /* the subround is only really over if *both* partners are done */ + if (GNUNET_YES == exp_subround_finished (cpi->session)) + subround_over (cpi->session, NULL); + else + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); + break; + case CONSENSUS_ROUND_INVENTORY: + cpi->inventory_synced = GNUNET_YES; + if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) + round_over (cpi->session, NULL); + /* FIXME: maybe go to next round */ + break; + default: + GNUNET_break (0); + } +} + + +/** + * The other peer wants us to inform that he sent us all the elements we requested. + */ +static int +handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) +{ + struct ConsensusRoundMessage *round_msg; + round_msg = (struct ConsensusRoundMessage *) msg; + /* FIXME: only call subround_over if round is the current one! */ + switch (cpi->session->current_round) + { + case CONSENSUS_ROUND_EXCHANGE: + case CONSENSUS_ROUND_COMPLETION: + if (cpi->session->current_round != round_msg->round) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + cpi->ibf_state = IBF_STATE_NONE; + cpi->ibf_bucket_counter = 0; + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + cpi->exp_subround_finished = GNUNET_YES; + if (GNUNET_YES == exp_subround_finished (cpi->session)) + subround_over (cpi->session, NULL); + else + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); + break; + case CONSENSUS_ROUND_INVENTORY: + cpi->inventory_synced = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); + if (inventory_round_finished (cpi->session)) + round_over (cpi->session, NULL); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); + break; } + return GNUNET_YES; +} + + +/** + * Gets called when the other peer wants us to inform that + * it has decoded our ibf and sent us all elements / requests + */ +static int +handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) +{ + struct PendingMessage *pm; + struct ConsensusRoundMessage *fin_msg; + + /* FIXME: why handle current round?? */ + switch (cpi->session->current_round) + { + case CONSENSUS_ROUND_INVENTORY: + cpi->inventory_synced = GNUNET_YES; + case CONSENSUS_ROUND_COMPLETION: + case CONSENSUS_ROUND_EXCHANGE: + LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n"); + pm = new_pending_message (sizeof *fin_msg, + GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); + fin_msg = (struct ConsensusRoundMessage *) pm->msg; + fin_msg->round = cpi->apparent_round; + /* the subround is over once we kicked off sending the fin msg */ + /* FIXME: assert we are talking to the right peer! */ + /* FIXME: mark peer as synced */ + pm->sent_cb = fin_sent_cb; + pm->sent_cb_cls = cpi; + message_queue_add (cpi->mss.mq, pm); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); + break; + } + return GNUNET_YES; } @@ -1459,8 +1797,9 @@ decode (struct ConsensusPeerInformation *cpi) static int mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) { - struct ConsensusPeerInformation *cpi; - cpi = cls; + struct ConsensusPeerInformation *cpi = cls; + GNUNET_assert (NULL == client); + GNUNET_assert (NULL != cls); switch (ntohs (message->type)) { case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: @@ -1485,6 +1824,228 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader } +static void +shuffle (struct ConsensusSession *session) +{ + /* adapted from random_permute in util/crypto_random.c */ + /* FIXME + unsigned int *ret; + unsigned int i; + unsigned int tmp; + uint32_t x; + + GNUNET_assert (n > 0); + ret = GNUNET_malloc (n * sizeof (unsigned int)); + for (i = 0; i < n; i++) + ret[i] = i; + for (i = n - 1; i > 0; i--) + { + x = GNUNET_CRYPTO_random_u32 (mode, i + 1); + tmp = ret[x]; + ret[x] = ret[i]; + ret[i] = tmp; + } + */ +} + + +/** + * Find and set the partner_incoming and partner_outgoing of our peer, + * one of them may not exist in most cases. + * + * @param session the consensus session + */ +static void +find_partners (struct ConsensusSession *session) +{ + int mark[session->num_peers]; + int i; + memset (mark, 0, session->num_peers * sizeof (int)); + session->partner_incoming = session->partner_outgoing = NULL; + for (i = 0; i < session->num_peers; i++) + { + int arc; + if (0 != mark[i]) + continue; + arc = (i + (1 << session->exp_subround)) % session->num_peers; + mark[i] = mark[arc] = 1; + GNUNET_assert (i != arc); + if (i == session->local_peer_idx) + { + GNUNET_assert (NULL == session->partner_outgoing); + session->partner_outgoing = &session->info[session->shuffle[arc]]; + session->partner_outgoing->exp_subround_finished = GNUNET_NO; + } + if (arc == session->local_peer_idx) + { + GNUNET_assert (NULL == session->partner_incoming); + session->partner_incoming = &session->info[session->shuffle[i]]; + session->partner_incoming->exp_subround_finished = GNUNET_NO; + } + } +} + + +/** + * Do the next subround in the exp-scheme. + * This function can be invoked as a timeout task, or called manually (tc will be NULL then). + * + * @param cls the session + * @param tc task context, for when this task is invoked by the scheduler, + * NULL if invoked for another reason + */ +static void +subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct ConsensusSession *session; + int i; + + /* don't kick off next subround if we're shutting down */ + if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + session = cls; + /* cancel timeout */ + if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) + GNUNET_SCHEDULER_cancel (session->round_timeout_tid); + session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; + /* check if we are done with the log phase, 2-peer consensus only does one log round */ + if ( (session->exp_round == NUM_EXP_ROUNDS) || + ((session->num_peers == 2) && (session->exp_round == 1))) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); + round_over (session, NULL); + return; + } + if (session->exp_round == 0) + { + /* initialize everything for the log-rounds */ + session->exp_round = 1; + session->exp_subround = 0; + if (NULL == session->shuffle) + session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers); + for (i = 0; i < session->num_peers; i++) + session->shuffle[i] = i; + } + else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) + { + /* subrounds done, start new log-round */ + session->exp_round++; + session->exp_subround = 0; + shuffle (session); + } + else + { + session->exp_subround++; + } + + find_partners (session); + +#ifdef GNUNET_EXTRA_LOGGING + { + int in; + int out; + if (session->partner_outgoing == NULL) + out = -1; + else + out = (int) (session->partner_outgoing - session->info); + if (session->partner_incoming == NULL) + in = -1; + else + in = (int) (session->partner_incoming - session->info); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, + session->exp_round, session->exp_subround, in, out); + } +#endif /* GNUNET_EXTRA_LOGGING */ + + if (NULL != session->partner_incoming) + { + session->partner_incoming->ibf_state = IBF_STATE_NONE; + session->partner_incoming->exp_subround_finished = GNUNET_NO; + session->partner_incoming->ibf_bucket_counter = 0; + + /* maybe there's an early strata estimator? */ + replay_premature_message (session->partner_incoming); + } + + if (NULL != session->partner_outgoing) + { + session->partner_outgoing->ibf_state = IBF_STATE_NONE; + session->partner_outgoing->ibf_bucket_counter = 0; + session->partner_outgoing->exp_subround_finished = GNUNET_NO; + /* make sure peer is connected and send the SE */ + embrace_peer (session->partner_outgoing); + } + + /* + session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), + subround_over, session); + */ +} + + +/** + * Search peer in the list of peers in session. + * + * @param peer peer to find + * @param session session with peer + * @return index of peer, -1 if peer is not in session + */ +static int +get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) +{ + int i; + for (i = 0; i < session->num_peers; i++) + if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) + return i; + return -1; +} + + +/** + * Handle a HELLO-message, send when another peer wants to join a session where + * our peer is a member. The session may or may not be inhabited yet. + */ +static int +handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) +{ + struct ConsensusSession *session; + + if (NULL != inc->requested_gid) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n"); + return GNUNET_YES; + } + if (NULL != inc->cpi) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n"); + return GNUNET_YES; + } + + for (session = sessions_head; NULL != session; session = session->next) + { + int idx; + struct ConsensusPeerInformation *cpi; + if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) + continue; + idx = get_peer_idx (&inc->peer_id, session); + GNUNET_assert (-1 != idx); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); + cpi = &session->info[idx]; + inc->cpi = cpi; + cpi->mss = inc->mss; + cpi = &session->info[idx]; + cpi->hello = GNUNET_YES; + cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); + embrace_peer (cpi); + return GNUNET_YES; + } + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); + inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode)); + return GNUNET_YES; +} + + + /** * Handle tokenized messages from stream sockets. * Delegate them if the socket belongs to a session, @@ -1502,7 +2063,9 @@ static int mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) { struct IncomingSocket *inc; - inc = (struct IncomingSocket *) client; + GNUNET_assert (NULL == client); + GNUNET_assert (NULL != cls); + inc = (struct IncomingSocket *) cls; switch (ntohs( message->type)) { case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: @@ -1542,133 +2105,18 @@ listen_cb (void *cls, return GNUNET_SYSERR; } incoming = GNUNET_malloc (sizeof *incoming); - incoming->socket = socket; incoming->peer_id = *initiator; - incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &incoming_stream_data_processor, incoming); - incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); + incoming->mss.socket = socket; + incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, + &stream_data_processor, &incoming->mss); + incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); + incoming->mss.mst_cls = incoming; GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); return GNUNET_OK; } /** - * Iterator over hash map entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -destroy_element_list_iter (void *cls, - const struct GNUNET_HashCode * key, - void *value) -{ - struct ElementList *el; - el = value; - while (NULL != el) - { - struct ElementList *el_old; - el_old = el; - el = el->next; - GNUNET_free (el_old->element_hash); - GNUNET_free (el_old->element); - GNUNET_free (el_old); - } - return GNUNET_YES; -} - - -/** - * Destroy a session, free all resources associated with it. - * - * @param session the session to destroy - */ -static void -destroy_session (struct ConsensusSession *session) -{ - int i; - - GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); - GNUNET_SERVER_client_drop (session->client); - session->client = NULL; - if (NULL != session->shuffle) - { - GNUNET_free (session->shuffle); - session->shuffle = NULL; - } - if (NULL != session->se) - { - strata_estimator_destroy (session->se); - session->se = NULL; - } - if (NULL != session->info) - { - for (i = 0; i < session->num_peers; i++) - { - struct ConsensusPeerInformation *cpi; - cpi = &session->info[i]; - if ((NULL != cpi) && (NULL != cpi->socket)) - { - if (NULL != cpi->rh) - { - GNUNET_STREAM_read_cancel (cpi->rh); - cpi->rh = NULL; - } - if (NULL != cpi->wh) - { - GNUNET_STREAM_write_cancel (cpi->wh); - cpi->wh = NULL; - } - GNUNET_STREAM_close (cpi->socket); - cpi->socket = NULL; - } - if (NULL != cpi->se) - { - strata_estimator_destroy (cpi->se); - cpi->se = NULL; - } - if (NULL != cpi->ibf) - { - ibf_destroy (cpi->ibf); - cpi->ibf = NULL; - } - if (NULL != cpi->mst) - { - GNUNET_SERVER_mst_destroy (cpi->mst); - cpi->mst = NULL; - } - } - GNUNET_free (session->info); - session->info = NULL; - } - if (NULL != session->ibfs) - { - for (i = 0; i <= MAX_IBF_ORDER; i++) - { - if (NULL != session->ibfs[i]) - { - ibf_destroy (session->ibfs[i]); - session->ibfs[i] = NULL; - } - } - GNUNET_free (session->ibfs); - session->ibfs = NULL; - } - if (NULL != session->values) - { - GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL); - GNUNET_CONTAINER_multihashmap_destroy (session->values); - session->values = NULL; - } - GNUNET_free (session); -} - - -/** * Disconnect a client, and destroy all sessions associated with it. * * @param client the client to disconnect @@ -1683,7 +2131,7 @@ disconnect_client (struct GNUNET_SERVER_Client *client) session = sessions_head; while (NULL != session) { - if (client == session->client) + if (client == session->scss.client) { destroy_session (session); break; @@ -1720,74 +2168,6 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod /** - * Transmit a queued message to the session's client. - * - * @param cls consensus session - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_queued (void *cls, size_t size, - void *buf) -{ - struct ConsensusSession *session; - struct QueuedMessage *qmsg; - size_t msg_size; - - session = cls; - session->client_th = NULL; - - qmsg = session->client_messages_head; - GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); - GNUNET_assert (qmsg); - - if (NULL == buf) - { - destroy_session (session); - return 0; - } - - msg_size = ntohs (qmsg->msg->size); - - GNUNET_assert (size >= msg_size); - - memcpy (buf, qmsg->msg, msg_size); - GNUNET_free (qmsg->msg); - GNUNET_free (qmsg); - - client_send_next (session); - - return msg_size; -} - - -/** - * Schedule transmitting the next queued message (if any) to the inhabiting client of a session. - * - * @param session the consensus session - */ -static void -client_send_next (struct ConsensusSession *session) -{ - - GNUNET_assert (NULL != session); - - if (NULL != session->client_th) - return; - - if (NULL != session->client_messages_head) - { - int msize; - msize = ntohs (session->client_messages_head->msg->size); - session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_queued, session); - } -} - - -/** * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have * the correct signature to be used with e.g. qsort. * We use this function instead. @@ -1804,67 +2184,6 @@ hash_cmp (const void *h1, const void *h2) /** - * Search peer in the list of peers in session. - * - * @param peer peer to find - * @param session session with peer - * @return index of peer, -1 if peer is not in session - */ -static int -get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) -{ - int i; - for (i = 0; i < session->num_peers; i++) - if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) - return i; - return -1; -} - - -/** - * Called when stream has finishes writing the hello message - */ -static void -hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) -{ - struct ConsensusPeerInformation *cpi; - - cpi = cls; - cpi->wh = NULL; - cpi->hello = GNUNET_YES; - GNUNET_assert (GNUNET_STREAM_OK == status); - embrace_peer (cpi); -} - - -/** - * Called when we established a stream connection to another peer - * - * @param cls cpi of the peer we just connected to - * @param socket socket to use to communicate with the other side (read/write) - */ -static void -open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) -{ - struct ConsensusPeerInformation *cpi; - struct ConsensusHello *hello; - - cpi = cls; - hello = GNUNET_malloc (sizeof *hello); - hello->header.size = htons (sizeof *hello); - hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); - memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); - GNUNET_assert (NULL == cpi->mst); - cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); - cpi->wh = - GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); - GNUNET_free (hello); - cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &session_stream_data_processor, cpi); -} - - -/** * Create the sorted list of peers for the session, * add the local peer if not in the join message. */ @@ -1921,19 +2240,6 @@ initialize_session_peer_list (struct ConsensusSession *session) } -static void -strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) -{ - uint32_t v; - int i; - v = key->bits[0]; - /* count trailing '1'-bits of v */ - for (i = 0; v & 1; v>>=1, i++) - /* empty */; - ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); -} - - /** * Add incoming peer connections to the session, * for peers who have connected to us before the local session has been established @@ -1944,28 +2250,23 @@ static void add_incoming_peers (struct ConsensusSession *session) { struct IncomingSocket *inc; - inc = incoming_sockets_head; + int i; + struct ConsensusPeerInformation *cpi; - while (NULL != inc) + for (inc = incoming_sockets_head; NULL != inc; inc = inc->next) { - if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) + if ( (NULL == inc->requested_gid) || + (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) ) + continue; + for (i = 0; i < session->num_peers; i++) { - int i; - for (i = 0; i < session->num_peers; i++) - { - struct ConsensusPeerInformation *cpi; - cpi = &session->info[i]; - if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity))) - { - cpi->socket = inc->socket; - inc->cpi = cpi; - inc->cpi->mst = inc->mst; - inc->cpi->hello = GNUNET_YES; - break; - } - } + cpi = &session->info[i]; + cpi->peer_id = inc->peer_id; + cpi->mss = inc->mss; + cpi->hello = GNUNET_YES; + inc->cpi = cpi; + break; } - inc = inc->next; } } @@ -1982,7 +2283,6 @@ initialize_session (struct ConsensusSession *session) GNUNET_assert (NULL != session->join_msg); initialize_session_peer_list (session); - session->current_round = CONSENSUS_ROUND_BEGIN; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); compute_global_id (session, &session->join_msg->session_id); @@ -1993,24 +2293,30 @@ initialize_session (struct ConsensusSession *session) if ((other_session != session) && (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) { - /* session already owned by another client */ - GNUNET_break (0); - disconnect_client (session->client); - return; + if (GNUNET_NO == other_session->conclude) + { + /* session already owned by another client */ + GNUNET_break (0); + disconnect_client (session->scss.client); + return; + } + else + { + GNUNET_SERVER_client_drop (session->scss.client); + session->scss.client = NULL; + break; + } } other_session = other_session->next; } - session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); session->local_peer_idx = get_peer_idx (my_peer, session); GNUNET_assert (-1 != session->local_peer_idx); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); - session->se = strata_estimator_create (); - session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); GNUNET_free (session->join_msg); session->join_msg = NULL; add_incoming_peers (session); - GNUNET_SERVER_receive_done (session->client, GNUNET_OK); + GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); } @@ -2033,7 +2339,7 @@ client_join (void *cls, session = sessions_head; while (NULL != session) { - if (session->client == client) + if (session->scss.client == client) { GNUNET_break (0); disconnect_client (client); @@ -2042,9 +2348,15 @@ client_join (void *cls, session = session->next; } - session = GNUNET_malloc (sizeof (struct ConsensusSession)); + session = GNUNET_new (struct ConsensusSession); session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); - session->client = client; + /* these have to be initialized here, as the client can already start to give us values */ + session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); + session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); + session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); + session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); + session->scss.client = client; + session->client_mq = create_message_queue_for_server_client (&session->scss); GNUNET_SERVER_client_keep (client); GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); @@ -2060,57 +2372,6 @@ client_join (void *cls, } -/** - * Hash a block of data, producing a replicated ibf hash. - */ -static void -hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret) -{ - struct IBF_Key ibf_key; - GNUNET_CRYPTO_hash (block, size, ret); - ibf_key = ibf_key_from_hashcode (ret); - ibf_hashcode_from_key (ibf_key, ret); -} - - -static void -insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) -{ - struct GNUNET_HashCode hash; - struct ElementList *head; - - hash_for_ibf (element->data, element->size, &hash); - - head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash); - - if (NULL == head) - { - int i; - - head = GNUNET_malloc (sizeof *head); - head->element = element; - head->next = NULL; - head->element_hash = GNUNET_memdup (&hash, sizeof hash); - GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - strata_estimator_insert (session->se, &hash); - - for (i = 0; i <= MAX_IBF_ORDER; i++) - if (NULL != session->ibfs[i]) - ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash)); - } - else - { - struct ElementList *el; - el = GNUNET_malloc (sizeof *el); - head->element = element; - head->next = NULL; - head->element_hash = GNUNET_memdup (&hash, sizeof hash); - while (NULL != head->next) - head = head->next; - head->next = el; - } -} /** @@ -2133,7 +2394,7 @@ client_insert (void *cls, session = sessions_head; while (NULL != session) { - if (session->client == client) + if (session->scss.client == client) break; } @@ -2146,373 +2407,15 @@ client_insert (void *cls, msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); - element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); - element->type = msg->element_type; element->size = element_size; memcpy (&element[1], &msg[1], element_size); element->data = &element[1]; - GNUNET_assert (NULL != element->data); - insert_element (session, element); GNUNET_SERVER_receive_done (client, GNUNET_OK); - - client_send_next (session); -} - - - -/** - * Functions of this signature are called whenever writing operations - * on a stream are executed - * - * @param cls the closure from GNUNET_STREAM_write - * @param status the status of the stream at the time this function is called; - * GNUNET_STREAM_OK if writing to stream was completed successfully; - * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully - * (this doesn't mean that the data is never sent, the receiver may - * have read the data but its ACKs may have been lost); - * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the - * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot - * be processed. - * @param size the number of bytes written - */ -static void -write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) -{ - struct ConsensusPeerInformation *cpi; - - GNUNET_assert (GNUNET_STREAM_OK == status); - cpi = cls; - cpi->wh = NULL; - if (NULL != cpi->messages_head) - { - struct QueuedMessage *qm; - qm = cpi->messages_head; - GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm); - cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - write_queued, cpi); - if (NULL != qm->cb) - qm->cb (qm->cls); - GNUNET_free (qm->msg); - GNUNET_free (qm); - GNUNET_assert (NULL != cpi->wh); - } -} - - -static void -shuffle (struct ConsensusSession *session) -{ - /* FIXME: implement */ -} - - -/** - * Find and set the partner_incoming and partner_outgoing of our peer, - * one of them may not exist in most cases. - * - * @param session the consensus session - */ -static void -find_partners (struct ConsensusSession *session) -{ - int mark[session->num_peers]; - int i; - memset (mark, 0, session->num_peers * sizeof (int)); - session->partner_incoming = session->partner_outgoing = NULL; - for (i = 0; i < session->num_peers; i++) - { - int arc; - if (0 != mark[i]) - continue; - arc = (i + (1 << session->exp_subround)) % session->num_peers; - mark[i] = mark[arc] = 1; - GNUNET_assert (i != arc); - if (i == session->local_peer_idx) - { - GNUNET_assert (NULL == session->partner_outgoing); - session->partner_outgoing = &session->info[session->shuffle[arc]]; - session->partner_outgoing->exp_subround_finished = GNUNET_NO; - } - if (arc == session->local_peer_idx) - { - GNUNET_assert (NULL == session->partner_incoming); - session->partner_incoming = &session->info[session->shuffle[i]]; - session->partner_incoming->exp_subround_finished = GNUNET_NO; - } - } -} - - -static void -replay_premature_message (struct ConsensusPeerInformation *cpi) -{ - if (NULL != cpi->premature_strata_message) - { - struct StrataMessage *sm; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); - sm = cpi->premature_strata_message; - cpi->premature_strata_message = NULL; - - cpi->replaying_strata_message = GNUNET_YES; - handle_p2p_strata (cpi, sm); - cpi->replaying_strata_message = GNUNET_NO; - - GNUNET_free (sm); - } -} - - -/** - * Do the next subround in the exp-scheme. - * This function can be invoked as a timeout task, or called manually (tc will be NULL then). - * - * @param cls the session - * @param tc task context, for when this task is invoked by the scheduler, - * NULL if invoked for another reason - */ -static void -subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct ConsensusSession *session; - int i; - - /* don't kick off next subround if we're shutting down */ - if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - session = cls; - /* don't send any messages from the last round */ - /* - clear_peer_messages (session->partner_outgoing); - clear_peer_messages (session->partner_incoming); - for (i = 0; i < session->num_peers; i++) - clear_peer_messages (&session->info[i]); - */ - /* cancel timeout */ - if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) - GNUNET_SCHEDULER_cancel (session->round_timeout_tid); - session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; - /* check if we are done with the log phase, 2-peer consensus only does one log round */ - if ( (session->exp_round == NUM_EXP_ROUNDS) || - ((session->num_peers == 2) && (session->exp_round == 1))) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); - round_over (session, NULL); - return; - } - if (session->exp_round == 0) - { - /* initialize everything for the log-rounds */ - session->exp_round = 1; - session->exp_subround = 0; - if (NULL == session->shuffle) - session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers); - for (i = 0; i < session->num_peers; i++) - session->shuffle[i] = i; - } - else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) - { - /* subrounds done, start new log-round */ - session->exp_round++; - session->exp_subround = 0; - shuffle (session); - } - else - { - session->exp_subround++; - } - - find_partners (session); - -#ifdef GNUNET_EXTRA_LOGGING - { - int in; - int out; - if (session->partner_outgoing == NULL) - out = -1; - else - out = (int) (session->partner_outgoing - session->info); - if (session->partner_incoming == NULL) - in = -1; - else - in = (int) (session->partner_incoming - session->info); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, - session->exp_round, session->exp_subround, in, out); - } -#endif /* GNUNET_EXTRA_LOGGING */ - - if (NULL != session->partner_incoming) - { - session->partner_incoming->ibf_state = IBF_STATE_NONE; - session->partner_incoming->exp_subround_finished = GNUNET_NO; - session->partner_incoming->ibf_bucket_counter = 0; - - /* maybe there's an early strata estimator? */ - replay_premature_message (session->partner_incoming); - } - - if (NULL != session->partner_outgoing) - { - session->partner_outgoing->ibf_state = IBF_STATE_NONE; - session->partner_outgoing->ibf_bucket_counter = 0; - session->partner_outgoing->exp_subround_finished = GNUNET_NO; - - if (NULL == session->partner_outgoing->socket) - { - session->partner_outgoing->socket = - GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, - open_cb, session->partner_outgoing, - GNUNET_STREAM_OPTION_END); - } - else if (GNUNET_YES == session->partner_outgoing->hello) - { - send_strata_estimator (session->partner_outgoing); - } - /* else: do nothing, the send hello cb will handle this */ - } - - /* - session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), - subround_over, session); - */ -} - -static void -contact_peer_a2a (struct ConsensusPeerInformation *cpi) -{ - cpi->is_outgoing = GNUNET_YES; - if (NULL == cpi->socket) - { - cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, - open_cb, cpi, GNUNET_STREAM_OPTION_END); - } - else if (GNUNET_YES == cpi->hello) - { - send_strata_estimator (cpi); - } -} - -/** - * Start the inventory round, contact all peers we are supposed to contact. - * - * @param session the current session - */ -static void -start_inventory (struct ConsensusSession *session) -{ - int i; - int last; - - for (i = 0; i < session->num_peers; i++) - { - session->info[i].ibf_bucket_counter = 0; - session->info[i].ibf_state = IBF_STATE_NONE; - session->info[i].is_outgoing = GNUNET_NO; - } - - last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; - i = (session->local_peer_idx + 1) % session->num_peers; - while (i != last) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); - contact_peer_a2a (&session->info[i]); - session->info[i].is_outgoing = GNUNET_YES; - i = (i + 1) % session->num_peers; - } - // tie-breaker for even number of peers - if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); - session->info[last].is_outgoing = GNUNET_YES; - contact_peer_a2a (&session->info[last]); - } - - for (i = 0; i < session->num_peers; i++) - { - if (GNUNET_NO == session->info[i].is_outgoing) - replay_premature_message (&session->info[i]); - } -} - -static void -send_client_conclude_done (struct ConsensusSession *session) -{ - struct GNUNET_MessageHeader *msg; - session->current_round = CONSENSUS_ROUND_FINISH; - msg = GNUNET_malloc (sizeof *msg); - msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); - msg->size = htons (sizeof *msg); - queue_client_message (session, msg); - client_send_next (session); -} - -/** - * Start the next round. - * This function can be invoked as a timeout task, or called manually (tc will be NULL then). - * - * @param cls the session - * @param tc task context, for when this task is invoked by the scheduler, - * NULL if invoked for another reason - */ -static void -round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct ConsensusSession *session; - - /* don't kick off next round if we're shutting down */ - if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - - session = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); - - /* - for (i = 0; i < session->num_peers; i++) - clear_peer_messages (&session->info[i]); - */ - - if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) - { - GNUNET_SCHEDULER_cancel (session->round_timeout_tid); - session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; - } - - switch (session->current_round) - { - case CONSENSUS_ROUND_BEGIN: - session->current_round = CONSENSUS_ROUND_EXCHANGE; - session->exp_round = 0; - subround_over (session, NULL); - break; - case CONSENSUS_ROUND_EXCHANGE: - /* handle two peers specially */ - if (session->num_peers <= 2) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx); - send_client_conclude_done (session); - return; - } - session->current_round = CONSENSUS_ROUND_INVENTORY; - start_inventory (session); - break; - case CONSENSUS_ROUND_INVENTORY: - session->current_round = CONSENSUS_ROUND_STOCK; - session->exp_round = 0; - subround_over (session, NULL); - break; - case CONSENSUS_ROUND_STOCK: - session->current_round = CONSENSUS_ROUND_FINISH; - send_client_conclude_done (session); - break; - default: - GNUNET_assert (0); - } } @@ -2534,7 +2437,7 @@ client_conclude (void *cls, cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; session = sessions_head; - while ((session != NULL) && (session->client != client)) + while ((session != NULL) && (session->scss.client != client)) session = session->next; if (NULL == session) { @@ -2553,6 +2456,8 @@ client_conclude (void *cls, return; } + session->conclude = GNUNET_YES; + if (session->num_peers <= 1) { send_client_conclude_done (session); @@ -2565,57 +2470,6 @@ client_conclude (void *cls, } GNUNET_SERVER_receive_done (client, GNUNET_OK); - client_send_next (session); -} - - -/** - * Called when a client sends an ack - * - * @param cls (unused) - * @param client client handle - * @param message message sent by the client - */ -static void -client_ack (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - struct ConsensusSession *session; - struct GNUNET_CONSENSUS_AckMessage *msg; - struct PendingElement *pending; - struct GNUNET_CONSENSUS_Element *element; - - session = sessions_head; - while (NULL != session) - { - if (session->client == client) - break; - } - - if (NULL == session) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n"); - GNUNET_SERVER_client_disconnect (client); - return; - } - - pending = session->client_approval_head; - - GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending); - - msg = (struct GNUNET_CONSENSUS_AckMessage *) message; - - if (msg->keep) - { - element = pending->element; - insert_element (session, element); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n"); - } - - GNUNET_free (pending); - - GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2649,14 +2503,10 @@ core_startup (void *cls, /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ GNUNET_SCHEDULER_add_now (&disconnect_core, core); GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); - - session = sessions_head; - while (NULL != session) - { + /* initialize sessions that are waiting for the local peer identity */ + for (session = sessions_head; NULL != session; session = session->next) if (NULL != session->join_msg) initialize_session (session); - session = session->next; - } } @@ -2670,27 +2520,12 @@ static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - /* FIXME: complete; write separate destructors for different data types */ - while (NULL != incoming_sockets_head) { struct IncomingSocket *socket; socket = incoming_sockets_head; - if (NULL != socket->rh) - { - GNUNET_STREAM_read_cancel (socket->rh); - socket->rh = NULL; - } if (NULL == socket->cpi) - { - GNUNET_STREAM_close (socket->socket); - socket->socket = NULL; - if (NULL != socket->mst) - { - GNUNET_SERVER_mst_destroy (socket->mst); - socket->mst = NULL; - } - } + clear_message_stream_state (&socket->mss); incoming_sockets_head = incoming_sockets_head->next; GNUNET_free (socket); } @@ -2738,8 +2573,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, - {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, - sizeof (struct GNUNET_CONSENSUS_AckMessage)}, {NULL, NULL, 0, 0} }; diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index 87dbdd696..739b97339 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c @@ -63,12 +63,10 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst) * * @param size number of IBF buckets * @param hash_num number of buckets one element is hashed in - * @param salt salt for mingling hashes, different salt may - * result in less (or more) collisions * @return the newly created invertible bloom filter */ struct InvertibleBloomFilter * -ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt) +ibf_create (uint32_t size, uint8_t hash_num) { struct InvertibleBloomFilter *ibf; @@ -235,32 +233,25 @@ ibf_decode (struct InvertibleBloomFilter *ibf, /** - * Write an ibf. + * Write buckets from an ibf to a buffer. + * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. * * @param ibf the ibf to write * @param start with which bucket to start * @param count how many buckets to write - * @param buf buffer to write the data to, will be updated to point to the - * first byte after the written data - * @param size pointer to the size of the buffer, will be updated, can be NULL + * @param buf buffer to write the data to */ void -ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size) +ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf) { struct IBF_Key *key_dst; struct IBF_KeyHash *key_hash_dst; struct IBF_Count *count_dst; - /* update size and check for overflow */ - if (NULL != size) - { - size_t old_size; - old_size = *size; - *size = *size - count * IBF_BUCKET_SIZE; - GNUNET_assert (*size < old_size); - } + GNUNET_assert (start + count <= ibf->size); + /* copy keys */ - key_dst = (struct IBF_Key *) *buf; + key_dst = (struct IBF_Key *) buf; memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst); key_dst += count; /* copy key hashes */ @@ -271,40 +262,28 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32 count_dst = (struct IBF_Count *) key_hash_dst; memcpy (count_dst, ibf->count + start, count * sizeof *count_dst); count_dst += count; - /* returned buffer is at the end of written data*/ - *buf = (void *) count_dst; } /** - * Read an ibf. + * Read buckets from a buffer into an ibf. * - * @param buf pointer to the buffer to write to, will point to first - * byte after the written data // FIXME: take 'const void *buf' for input, return number of bytes READ - * @param size size of the buffer, will be updated + * @param buf pointer to the buffer to read from * @param start which bucket to start at * @param count how many buckets to read * @param ibf the ibf to read from - * @return GNUNET_OK on success // FIXME: return 0 on error (or -1/ssize_t), number of bytes read otherwise */ -int -ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) +void +ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) { struct IBF_Key *key_src; struct IBF_KeyHash *key_hash_src; struct IBF_Count *count_src; - /* update size and check for overflow */ - if (NULL != size) - { - size_t old_size; - old_size = *size; - *size = *size - count * IBF_BUCKET_SIZE; - if (*size > old_size) - return GNUNET_SYSERR; - } + GNUNET_assert (start + count <= ibf->size); + /* copy keys */ - key_src = (struct IBF_Key *) *buf; + key_src = (struct IBF_Key *) buf; memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src); key_src += count; /* copy key hashes */ @@ -315,40 +294,6 @@ ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct count_src = (struct IBF_Count *) key_hash_src; memcpy (ibf->count + start, count_src, count * sizeof *count_src); count_src += count; - /* returned buffer is at the end of written data*/ - *buf = (void *) count_src; - return GNUNET_OK; -} - - -/** - * Write an ibf. - * - * @param ibf the ibf to write - * @param buf buffer to write the data to, will be updated to point to the - * first byte after the written data - * @param size pointer to the size of the buffer, will be updated, can be NULL - */ -void -ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size) -{ - ibf_write_slice (ibf, 0, ibf->size, buf, size); -} - - -/** - * Read an ibf. - * - * @param buf pointer to the buffer to write to, will point to first - * byte after the written data - * @param size size of the buffer, will be updated - * @param dst ibf to write buckets to - * @return GNUNET_OK on success - */ -int -ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst) -{ - return ibf_read_slice (buf, size, 0, dst->size, dst); } @@ -366,7 +311,6 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi GNUNET_assert (ibf1->size == ibf2->size); GNUNET_assert (ibf1->hash_num == ibf2->hash_num); - GNUNET_assert (ibf1->salt == ibf2->salt); for (i = 0; i < ibf1->size; i++) { @@ -388,7 +332,6 @@ ibf_dup (const struct InvertibleBloomFilter *ibf) struct InvertibleBloomFilter *copy; copy = GNUNET_malloc (sizeof *copy); copy->hash_num = ibf->hash_num; - copy->salt = ibf->salt; copy->size = ibf->size; copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash)); copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key)); diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h index 609653889..2bf3ef7c7 100644 --- a/src/consensus/ibf.h +++ b/src/consensus/ibf.h @@ -81,11 +81,6 @@ struct InvertibleBloomFilter uint8_t hash_num; /** - * Salt for mingling hashes - */ - uint32_t salt; - - /** * Xor sums of the elements' keys, used to identify the elements. * Array of 'size' elements. */ @@ -107,58 +102,28 @@ struct InvertibleBloomFilter /** - * Write an ibf. + * Write buckets from an ibf to a buffer. + * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. * * @param ibf the ibf to write * @param start with which bucket to start * @param count how many buckets to write - * @param buf buffer to write the data to, will be updated to point to the - * first byte after the written data - * @param size pointer to the size of the buffer, will be updated, can be NULL + * @param buf buffer to write the data to */ void -ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size); +ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf); /** - * Read an ibf. + * Read buckets from a buffer into an ibf. * - * @param buf pointer to the buffer to write to, will point to first - * byte after the written data - * @param size size of the buffer, will be updated + * @param buf pointer to the buffer to read from * @param start which bucket to start at * @param count how many buckets to read * @param ibf the ibf to read from - * @return GNUNET_OK on success - */ -int -ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *dst); - - -/** - * Write an ibf. - * - * @param ibf the ibf to write - * @param buf buffer to write the data to, will be updated to point to the - * first byte after the written data - * @param size pointer to the size of the buffer, will be updated, can be NULL */ void -ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size); - - - -/** - * Read an ibf. - * - * @param buf pointer to the buffer to write to, will point to first - * byte after the written data - * @param size size of the buffer, will be updated - * @param dst ibf to write buckets to - * @return GNUNET_OK on success - */ -int -ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst); +ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf); /** @@ -187,12 +152,10 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst); * * @param size number of IBF buckets * @param hash_num number of buckets one element is hashed in, usually 3 or 4 - * @param salt salt for mingling hashes, different salt may - * result in less (or more) collisions * @return the newly created invertible bloom filter */ struct InvertibleBloomFilter * -ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt); +ibf_create (uint32_t size, uint8_t hash_num); /** diff --git a/src/consensus/strata_estimator.c b/src/consensus/strata_estimator.c new file mode 100644 index 000000000..685c50f0f --- /dev/null +++ b/src/consensus/strata_estimator.c @@ -0,0 +1,145 @@ +/* + This file is part of GNUnet + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file consensus/ibf.h + * @brief invertible bloom filter + * @author Florian Dold + */ + +#include "platform.h" +#include "gnunet_common.h" +#include "ibf.h" +#include "strata_estimator.h" + +void +strata_estimator_write (const struct StrataEstimator *se, void *buf) +{ + int i; + for (i = 0; i < se->strata_count; i++) + { + ibf_write_slice (se->strata[i], 0, se->ibf_size, buf); + buf += se->ibf_size * IBF_BUCKET_SIZE; + } +} + +void +strata_estimator_read (const void *buf, struct StrataEstimator *se) +{ + int i; + for (i = 0; i < se->strata_count; i++) + { + ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); + buf += se->ibf_size * IBF_BUCKET_SIZE; + } +} + + +void +strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) +{ + uint32_t v; + int i; + v = key->bits[0]; + /* count trailing '1'-bits of v */ + for (i = 0; v & 1; v>>=1, i++) + /* empty */; + ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); +} + + + +struct StrataEstimator * +strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum) +{ + struct StrataEstimator *se; + int i; + + /* fixme: allocate everything in one chunk */ + + se = GNUNET_malloc (sizeof (struct StrataEstimator)); + se->strata_count = strata_count; + se->ibf_size = ibf_size; + se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter *) * strata_count); + for (i = 0; i < strata_count; i++) + se->strata[i] = ibf_create (ibf_size, ibf_hashnum); + return se; +} + + +/** + * Estimate set difference with two strata estimators, + * i.e. arrays of IBFs. + * Does not not modify its arguments. + * + * @param se1 first strata estimator + * @param se2 second strata estimator + * @return the estimated difference + */ +unsigned int +strata_estimator_difference (const struct StrataEstimator *se1, + const struct StrataEstimator *se2) +{ + int i; + int count; + + GNUNET_assert (se1->strata_count == se2->strata_count); + count = 0; + for (i = se1->strata_count - 1; i >= 0; i--) + { + struct InvertibleBloomFilter *diff; + /* number of keys decoded from the ibf */ + int ibf_count; + int more; + ibf_count = 0; + /* FIXME: implement this without always allocating new IBFs */ + diff = ibf_dup (se1->strata[i]); + ibf_subtract (diff, se2->strata[i]); + for (;;) + { + more = ibf_decode (diff, NULL, NULL); + if (GNUNET_NO == more) + { + count += ibf_count; + break; + } + if (GNUNET_SYSERR == more) + { + ibf_destroy (diff); + return count * (1 << (i + 1)); + } + ibf_count++; + } + ibf_destroy (diff); + } + return count; +} + + +void +strata_estimator_destroy (struct StrataEstimator *se) +{ + int i; + for (i = 0; i < se->strata_count; i++) + ibf_destroy (se->strata[i]); + GNUNET_free (se->strata); + GNUNET_free (se); +} + diff --git a/src/consensus/consensus_flout.h b/src/consensus/strata_estimator.h index 6c97813a5..cb5bd3d0a 100644 --- a/src/consensus/consensus_flout.h +++ b/src/consensus/strata_estimator.h @@ -16,16 +16,20 @@ along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ +*/ /** - * @file consensus/consensus_flout.h - * @brief intentionally misbehave in certain ways for testing + * @file consensus/strata_estimator.h + * @brief estimator of set difference * @author Florian Dold */ -#ifndef GNUNET_CONSENSUS_FLOUT_H -#define GNUNET_CONSENSUS_FLOUT_H +#ifndef GNUNET_CONSENSUS_STRATA_ESTIMATOR_H +#define GNUNET_CONSENSUS_STRATA_ESTIMATOR_H + +#include "platform.h" +#include "gnunet_common.h" +#include "gnunet_util_lib.h" #ifdef __cplusplus extern "C" @@ -35,19 +39,38 @@ extern "C" #endif #endif -#include "platform.h" -#include "gnunet_common.h" -#include "gnunet_consensus_service.h" + +struct StrataEstimator +{ + struct InvertibleBloomFilter **strata; + unsigned int strata_count; + unsigned int ibf_size; +}; + void -GNUNET_CONSENSUS_flout_disable_peer (struct GNUNET_CONSENSUS_Handle *consensus); +strata_estimator_write (const struct StrataEstimator *se, void *buf); + void -GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_HashCode *element_hash); +strata_estimator_read (const void *buf, struct StrataEstimator *se); + + +struct StrataEstimator * +strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum); + + +unsigned int +strata_estimator_difference (const struct StrataEstimator *se1, + const struct StrataEstimator *se2); + void -GNUNET_CONSENSUS_flout_send_bogos_ibf (struct GNUNET_CONSENSUS_Handle *consensus, ...); +strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key); + +void +strata_estimator_destroy (struct StrataEstimator *se); #if 0 /* keep Emacsens' auto-indent happy */ @@ -58,3 +81,4 @@ GNUNET_CONSENSUS_flout_send_bogos_ibf (struct GNUNET_CONSENSUS_Handle *consensus #endif #endif + diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index 89f109345..6dc37f7d9 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -1247,10 +1247,8 @@ insert_next_element (void *cls, * * @param cls the 'struct DirectNeighbor' we're building the consensus with * @param element the new element we have learned - * @return GNUNET_OK if the valid is well-formed and should be added to the consensus, - * GNUNET_SYSERR if the element should be ignored and not be propagated */ -static int +static void learn_route_cb (void *cls, const struct GNUNET_CONSENSUS_Element *element) { @@ -1274,12 +1272,12 @@ learn_route_cb (void *cls, neighbor->consensus_task = GNUNET_SCHEDULER_add_delayed (GNUNET_DV_CONSENSUS_FREQUENCY, &start_consensus, neighbor); - return GNUNET_SYSERR; + return; } if (sizeof (struct Target) != element->size) { GNUNET_break_op (0); - return GNUNET_SYSERR; + return; } target = GNUNET_malloc (sizeof (struct Target)); memcpy (target, element->data, sizeof (struct Target)); @@ -1291,9 +1289,8 @@ learn_route_cb (void *cls, { GNUNET_break_op (0); GNUNET_free (target); - return GNUNET_SYSERR; + return; } - return GNUNET_OK; } diff --git a/src/include/gnunet_consensus_service.h b/src/include/gnunet_consensus_service.h index 0c74a6a27..f7f784f6e 100644 --- a/src/include/gnunet_consensus_service.h +++ b/src/include/gnunet_consensus_service.h @@ -71,10 +71,8 @@ struct GNUNET_CONSENSUS_Element * * @param cls closure * @param element new element, NULL on error - * @return GNUNET_OK if the valid is well-formed and should be added to the consensus, - * GNUNET_SYSERR if the element should be ignored and not be propagated */ -typedef int (*GNUNET_CONSENSUS_ElementCallback) (void *cls, +typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls, const struct GNUNET_CONSENSUS_Element *element); diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 7179914af..431542660 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1754,6 +1754,11 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN 548 +/** + * Abort a round, don't send requested elements anymore + */ +#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548 + /** * Next available: 570 |