aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-04-11 10:08:52 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-04-11 10:08:52 +0000
commit210be82b7cdc6058401e7d5042aa50dd0b750c92 (patch)
treee2bfa5a87038ef0a7f906d5ede8d6e7ea7f2638b /src
parent2b406c1533a919057cda8850315af1fca5b48a45 (diff)
downloadgnunet-210be82b7cdc6058401e7d5042aa50dd0b750c92.tar.gz
gnunet-210be82b7cdc6058401e7d5042aa50dd0b750c92.zip
added consensus log-round simulation, work on consensus service, still problems with dv test case
Diffstat (limited to 'src')
-rw-r--r--src/consensus/Makefile.am6
-rw-r--r--src/consensus/consensus-simulation.py103
-rw-r--r--src/consensus/consensus_api.c20
-rw-r--r--src/consensus/gnunet-consensus-ibf.c4
-rw-r--r--src/consensus/gnunet-consensus.c3
-rw-r--r--src/consensus/gnunet-service-consensus.c2779
-rw-r--r--src/consensus/ibf.c87
-rw-r--r--src/consensus/ibf.h53
-rw-r--r--src/consensus/strata_estimator.c145
-rw-r--r--src/consensus/strata_estimator.h (renamed from src/consensus/consensus_flout.h)46
-rw-r--r--src/dv/gnunet-service-dv.c11
-rw-r--r--src/include/gnunet_consensus_service.h4
-rw-r--r--src/include/gnunet_protocols.h5
13 files changed, 1630 insertions, 1636 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index e469de057..82af29c87 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -61,7 +61,8 @@ gnunet_consensus_ibf_LDADD = \
61 61
62gnunet_service_consensus_SOURCES = \ 62gnunet_service_consensus_SOURCES = \
63 gnunet-service-consensus.c \ 63 gnunet-service-consensus.c \
64 ibf.c 64 ibf.c \
65 strata_estimator.c
65gnunet_service_consensus_LDADD = \ 66gnunet_service_consensus_LDADD = \
66 $(top_builddir)/src/util/libgnunetutil.la \ 67 $(top_builddir)/src/util/libgnunetutil.la \
67 $(top_builddir)/src/core/libgnunetcore.la \ 68 $(top_builddir)/src/core/libgnunetcore.la \
@@ -71,7 +72,8 @@ gnunet_service_consensus_LDADD = \
71 72
72gnunet_service_evil_consensus_SOURCES = \ 73gnunet_service_evil_consensus_SOURCES = \
73 gnunet-service-consensus.c \ 74 gnunet-service-consensus.c \
74 ibf.c 75 ibf.c \
76 strata_estimator.c
75gnunet_service_evil_consensus_LDADD = \ 77gnunet_service_evil_consensus_LDADD = \
76 $(top_builddir)/src/util/libgnunetutil.la \ 78 $(top_builddir)/src/util/libgnunetutil.la \
77 $(top_builddir)/src/core/libgnunetcore.la \ 79 $(top_builddir)/src/core/libgnunetcore.la \
diff --git a/src/consensus/consensus-simulation.py b/src/consensus/consensus-simulation.py
new file mode 100644
index 000000000..930dfee62
--- /dev/null
+++ b/src/consensus/consensus-simulation.py
@@ -0,0 +1,103 @@
1#!/usr/bin/python
2# This file is part of GNUnet
3# (C) 2013 Christian Grothoff (and other contributing authors)
4#
5# GNUnet is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published
7# by the Free Software Foundation; either version 2, or (at your
8# option) any later version.
9#
10# GNUnet is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13# General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with GNUnet; see the file COPYING. If not, write to the
17# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18# Boston, MA 02111-1307, USA.
19
20import argparse
21import random
22from math import ceil,log,floor
23
24def bsc(n):
25 """ count the bits set in n"""
26 l = n.bit_length()
27 c = 0
28 x = 1
29 for _ in range(0, l):
30 if n & x:
31 c = c + 1
32 x = x << 1
33 return c
34
35def simulate(k, n, verbose):
36 assert k < n
37 largest_arc = int(2**ceil(log(n, 2))) / 2
38 num_ghosts = (2 * largest_arc) - n
39 if verbose:
40 print "we have", num_ghosts, "ghost peers"
41 # n.b. all peers with idx<k are evil
42 peers = range(n)
43 info = [1 << x for x in xrange(n)]
44 def done_p():
45 for x in xrange(k, n):
46 if bsc(info[x]) < n-k:
47 return False
48 return True
49 rounds = 0
50 while not done_p():
51 if verbose:
52 print "-- round --"
53 arc = 1
54 while arc <= largest_arc:
55 if verbose:
56 print "-- subround --"
57 new_info = [x for x in info]
58 for peer_physical in xrange(n):
59 peer_logical = peers[peer_physical]
60 peer_type = None
61 partner_logical = (peer_logical + arc) % n
62 partner_physical = peers.index(partner_logical)
63 if peer_physical < k or partner_physical < k:
64 if verbose:
65 print "bad peer in connection", peer_physical, "--", partner_physical
66 continue
67 if peer_logical & arc == 0:
68 # we are outgoing
69 if verbose:
70 print peer_physical, "connects to", partner_physical
71 peer_type = "outgoing"
72 if peer_logical < num_ghosts:
73 # we have a ghost, check if the peer who connects
74 # to our ghost is actually outgoing
75 ghost_partner_logical = (peer_logical - arc) % n
76 if ghost_partner_logical & arc == 0:
77 peer_type = peer_type + ", ghost incoming"
78 new_info[peer_physical] = new_info[peer_physical] | info[peer_physical] | info[partner_physical]
79 new_info[partner_physical] = new_info[partner_physical] | info[peer_physical] | info[partner_physical]
80 else:
81 peer_type = "incoming"
82 if verbose > 1:
83 print "type of", str(peer_physical) + ":", peer_type
84 info = new_info
85 arc = arc << 1;
86 rounds = rounds + 1
87 random.shuffle(peers)
88 return rounds
89
90if __name__ == "__main__":
91 parser = argparse.ArgumentParser()
92 parser.add_argument("k", metavar="k", type=int, help="#(bad peers)")
93 parser.add_argument("n", metavar="n", type=int, help="#(all peers)")
94 parser.add_argument("r", metavar="r", type=int, help="#(rounds)")
95 parser.add_argument('--verbose', '-v', action='count')
96
97 args = parser.parse_args()
98 sum = 0.0;
99 for n in xrange (0, args.r):
100 sum += simulate(args.k, args.n, args.verbose)
101 print sum / args.r;
102
103
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 25ace3a4d..fd61d3712 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -231,15 +231,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus)
231 } 231 }
232} 232}
233 233
234static void
235queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
236{
237 struct QueuedMessage *qm;
238 qm = GNUNET_malloc (sizeof *qm);
239 qm->msg = msg;
240 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
241}
242
243 234
244/** 235/**
245 * Called when the server has sent is a new element 236 * Called when the server has sent is a new element
@@ -252,8 +243,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
252 struct GNUNET_CONSENSUS_ElementMessage *msg) 243 struct GNUNET_CONSENSUS_ElementMessage *msg)
253{ 244{
254 struct GNUNET_CONSENSUS_Element element; 245 struct GNUNET_CONSENSUS_Element element;
255 struct GNUNET_CONSENSUS_AckMessage *ack_msg;
256 int ret;
257 246
258 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); 247 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
259 248
@@ -261,14 +250,7 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
261 element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); 250 element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
262 element.data = &msg[1]; 251 element.data = &msg[1];
263 252
264 ret = consensus->new_element_cb (consensus->new_element_cls, &element); 253 consensus->new_element_cb (consensus->new_element_cls, &element);
265
266 ack_msg = GNUNET_new (struct GNUNET_CONSENSUS_AckMessage);
267 ack_msg->header.size = htons (sizeof *ack_msg);
268 ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
269 ack_msg->keep = ret;
270
271 queue_message (consensus, &ack_msg->header);
272 254
273 send_next (consensus); 255 send_next (consensus);
274} 256}
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c
index 73dc31b56..d431795f1 100644
--- a/src/consensus/gnunet-consensus-ibf.c
+++ b/src/consensus/gnunet-consensus-ibf.c
@@ -160,8 +160,8 @@ run (void *cls, char *const *args, const char *cfgfile,
160 i++; 160 i++;
161 } 161 }
162 162
163 ibf_a = ibf_create (ibf_size, hash_num, 0); 163 ibf_a = ibf_create (ibf_size, hash_num);
164 ibf_b = ibf_create (ibf_size, hash_num, 0); 164 ibf_b = ibf_create (ibf_size, hash_num);
165 165
166 printf ("generated sets\n"); 166 printf ("generated sets\n");
167 167
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index 9e9b89446..d8c1b14ee 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -192,12 +192,11 @@ connect_complete (void *cls,
192} 192}
193 193
194 194
195static int 195static void
196new_element_cb (void *cls, 196new_element_cb (void *cls,
197 const struct GNUNET_CONSENSUS_Element *element) 197 const struct GNUNET_CONSENSUS_Element *element)
198{ 198{
199 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); 199 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
200 return GNUNET_YES;
201} 200}
202 201
203 202
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index ebd2d238b..179df0fb0 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -32,23 +32,32 @@
32#include "gnunet_consensus_service.h" 32#include "gnunet_consensus_service.h"
33#include "gnunet_core_service.h" 33#include "gnunet_core_service.h"
34#include "gnunet_stream_lib.h" 34#include "gnunet_stream_lib.h"
35
35#include "consensus_protocol.h" 36#include "consensus_protocol.h"
36#include "ibf.h"
37#include "consensus.h" 37#include "consensus.h"
38#include "ibf.h"
39#include "strata_estimator.h"
40
41
42/*
43 * Log macro that prefixes the local peer and the peer we are in contact with.
44 */
45#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
46 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
38 47
39 48
40/** 49/**
41 * Number of IBFs in a strata estimator. 50 * Number of IBFs in a strata estimator.
42 */ 51 */
43#define STRATA_COUNT 32 52#define SE_STRATA_COUNT 32
44/** 53/**
45 * Number of buckets per IBF. 54 * Size of the IBFs in the strata estimator.
46 */ 55 */
47#define STRATA_IBF_BUCKETS 80 56#define SE_IBF_SIZE 80
48/** 57/**
49 * hash num parameter for the difference digests and strata estimators 58 * hash num parameter for the difference digests and strata estimators
50 */ 59 */
51#define STRATA_HASH_NUM 3 60#define SE_IBF_HASH_NUM 3
52 61
53/** 62/**
54 * Number of buckets that can be transmitted in one message. 63 * Number of buckets that can be transmitted in one message.
@@ -63,72 +72,55 @@
63#define MAX_IBF_ORDER (16) 72#define MAX_IBF_ORDER (16)
64 73
65/** 74/**
66 * Number exp-rounds. 75 * Number of exponential rounds, used in the inventory and completion round.
67 */ 76 */
68#define NUM_EXP_ROUNDS (4) 77#define NUM_EXP_ROUNDS (4)
69 78
70 79
71/* forward declarations */ 80/* forward declarations */
72 81
73struct ConsensusSession; 82/* mutual recursion with struct ConsensusSession */
74struct IncomingSocket;
75struct ConsensusPeerInformation; 83struct ConsensusPeerInformation;
76 84
77static void 85struct MessageQueue;
78client_send_next (struct ConsensusSession *session);
79
80static int
81get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
82
83static void
84round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
85 86
87/* mutual recursion with round_over */
86static void 88static void
87send_ibf (struct ConsensusPeerInformation *cpi); 89subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
88 90
91/* mutial recursion with transmit_queued */
89static void 92static void
90send_strata_estimator (struct ConsensusPeerInformation *cpi); 93client_send_next (struct MessageQueue *mq);
91 94
95/* mutual recursion with mst_session_callback */
92static void 96static void
93decode (struct ConsensusPeerInformation *cpi); 97open_cb (void *cls, struct GNUNET_STREAM_Socket *socket);
94
95static void
96write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size);
97 98
98static void 99static int
99subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 100mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message);
100 101
101 102
102/** 103/**
103 * An element that is waiting to be transmitted to the client. 104 * Additional information about a consensus element.
104 */ 105 */
105struct PendingElement 106struct ElementInfo
106{ 107{
107 /** 108 /**
108 * Pending elements are kept in a DLL. 109 * The element itself.
109 */ 110 */
110 struct PendingElement *next; 111 struct GNUNET_CONSENSUS_Element *element;
111
112 /** 112 /**
113 * Pending elements are kept in a DLL. 113 * Hash of the element
114 */ 114 */
115 struct PendingElement *prev; 115 struct GNUNET_HashCode *element_hash;
116
117 /** 116 /**
118 * The actual element 117 * Number of other peers that have the element in the inventory.
119 */ 118 */
120 struct GNUNET_CONSENSUS_Element *element; 119 unsigned int inventory_count;
121 120 /**
122 /* peer this element is coming from */ 121 * Bitmap of peers that have this element in their inventory
123 struct ConsensusPeerInformation *cpi; 122 */
124}; 123 uint8_t *inventory_bitmap;
125
126
127struct ElementList
128{
129 struct ElementList *next;
130 struct GNUNET_CONSENSUS_Element *element;
131 struct GNUNET_HashCode *element_hash;
132}; 124};
133 125
134 126
@@ -147,178 +139,93 @@ enum ConsensusRound
147 CONSENSUS_ROUND_EXCHANGE, 139 CONSENSUS_ROUND_EXCHANGE,
148 /** 140 /**
149 * Exchange which elements each peer has, but not the elements. 141 * Exchange which elements each peer has, but not the elements.
142 * This round uses the all-to-all scheme.
150 */ 143 */
151 CONSENSUS_ROUND_INVENTORY, 144 CONSENSUS_ROUND_INVENTORY,
152 /** 145 /**
153 * Collect and distribute missing values. 146 * Collect and distribute missing values with the exponential scheme.
154 */ 147 */
155 CONSENSUS_ROUND_STOCK, 148 CONSENSUS_ROUND_COMPLETION,
156 /** 149 /**
157 * Consensus concluded. 150 * Consensus concluded. After timeout and finished communication with client,
151 * consensus session will be destroyed.
158 */ 152 */
159 CONSENSUS_ROUND_FINISH 153 CONSENSUS_ROUND_FINISH
160}; 154};
161 155
156/* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */
162 157
163/** 158/**
164 * Information about a peer that is in a consensus session. 159 * State of another peer with respect to the
160 * current ibf.
165 */ 161 */
166struct ConsensusPeerInformation 162enum ConsensusIBFState {
167{
168 struct GNUNET_PeerIdentity peer_id;
169
170 /**
171 * Socket for communicating with the peer, either created by the local peer,
172 * or the remote peer.
173 */
174 struct GNUNET_STREAM_Socket *socket;
175
176 /**
177 * Message tokenizer, for the data received from this peer via the stream socket.
178 */
179 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
180
181 /**
182 * Do we connect to the peer, or does the peer connect to us?
183 * Only valid for all-to-all phases
184 */
185 int is_outgoing;
186
187 /**
188 * Did we receive/send a consensus hello?
189 */
190 int hello;
191
192 /** 163 /**
193 * Handle for currently active read 164 * There is nothing going on with the IBF.
194 */ 165 */
195 struct GNUNET_STREAM_ReadHandle *rh; 166 IBF_STATE_NONE=0,
196
197 /** 167 /**
198 * Handle for currently active read 168 * We currently receive an ibf.
199 */ 169 */
200 struct GNUNET_STREAM_WriteHandle *wh; 170 IBF_STATE_RECEIVING,
201 171 /*
202 enum { 172 * we decode a received ibf
203 /* beginning of round */ 173 */
204 IBF_STATE_NONE=0, 174 IBF_STATE_DECODING,
205 /* we currently receive an ibf */
206 IBF_STATE_RECEIVING,
207 /* we currently transmit an ibf */
208 IBF_STATE_TRANSMITTING,
209 /* we decode a received ibf */
210 IBF_STATE_DECODING,
211 /* wait for elements and element requests */
212 IBF_STATE_ANTICIPATE_DIFF
213 } ibf_state ;
214
215 /**
216 * What is the order (=log2 size) of the ibf
217 * we're currently dealing with?
218 * Interpretation depends on ibf_state.
219 */
220 int ibf_order;
221
222 /**
223 * The current IBF for this peer,
224 * purpose dependent on ibf_state
225 */
226 struct InvertibleBloomFilter *ibf;
227
228 /**
229 * How many buckets have we transmitted/received?
230 * Interpretatin depends on ibf_state
231 */
232 int ibf_bucket_counter;
233
234 /**
235 * Strata estimator of the peer, NULL if our peer
236 * initiated the reconciliation.
237 */
238 struct StrataEstimator *se;
239
240 /**
241 * Element keys that this peer misses, but we have them.
242 */
243 struct GNUNET_CONTAINER_MultiHashMap *requested_keys;
244
245 /**
246 * Element keys that this peer has, but we miss.
247 */
248 struct GNUNET_CONTAINER_MultiHashMap *reported_keys;
249
250 /**
251 * Back-reference to the consensus session,
252 * to that ConsensusPeerInformation can be used as a closure
253 */
254 struct ConsensusSession *session;
255
256 /** 175 /**
257 * Messages queued for the current round. 176 * wait for elements and element requests
258 */ 177 */
259 struct QueuedMessage *messages_head; 178 IBF_STATE_ANTICIPATE_DIFF
179};
260 180
261 /**
262 * Messages queued for the current round.
263 */
264 struct QueuedMessage *messages_tail;
265 181
266 /** 182typedef void (*AddCallback) (struct MessageQueue *mq);
267 * True if we are actually replaying the strata message, 183typedef void (*MessageSentCallback) (void *cls);
268 * e.g. currently handling the premature_strata_message.
269 */
270 int replaying_strata_message;
271 184
272 /**
273 * A strata message that is not actually for the current round,
274 * used in the exp-scheme.
275 */
276 struct StrataMessage *premature_strata_message;
277 185
278 /** 186/**
279 * We have finishes the exp-subround with the peer. 187 * Collection of the state necessary to read and write gnunet messages
280 */ 188 * to a stream socket. Should be used as closure for stream_data_processor.
281 int exp_subround_finished; 189 */
190struct MessageStreamState
191{
192 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
193 struct MessageQueue *mq;
194 void *mst_cls;
195 struct GNUNET_STREAM_Socket *socket;
196 struct GNUNET_STREAM_ReadHandle *rh;
197 struct GNUNET_STREAM_WriteHandle *wh;
198};
282 199
283 int inventory_synced;
284
285 /**
286 * Round this peer seems to be in, according to the last SE we got.
287 * Necessary to store this, as we sometimes need to respond to a request from an
288 * older round, while we are already in the next round.
289 */
290 enum ConsensusRound apparent_round;
291 200
201struct ServerClientSocketState
202{
203 struct GNUNET_SERVER_Client *client;
204 struct GNUNET_SERVER_TransmitHandle* th;
292}; 205};
293 206
294typedef void (*QueuedMessageCallback) (void *msg);
295 207
296/** 208/**
297 * A doubly linked list of messages. 209 * Generic message queue, for queueing outgoing messages.
298 */ 210 */
299struct QueuedMessage 211struct MessageQueue
300{ 212{
301 struct GNUNET_MessageHeader *msg; 213 void *state;
302 214 AddCallback add_cb;
303 /** 215 struct PendingMessage *pending_head;
304 * Queued messages are stored in a doubly linked list. 216 struct PendingMessage *pending_tail;
305 */ 217 struct PendingMessage *current_pm;
306 struct QueuedMessage *next;
307
308 /**
309 * Queued messages are stored in a doubly linked list.
310 */
311 struct QueuedMessage *prev;
312
313 QueuedMessageCallback cb;
314
315 void *cls;
316}; 218};
317 219
318 220
319struct StrataEstimator 221struct PendingMessage
320{ 222{
321 struct InvertibleBloomFilter **strata; 223 struct GNUNET_MessageHeader *msg;
224 struct MessageQueue *parent_queue;
225 struct PendingMessage *next;
226 struct PendingMessage *prev;
227 MessageSentCallback sent_cb;
228 void *sent_cb_cls;
322}; 229};
323 230
324 231
@@ -351,38 +258,26 @@ struct ConsensusSession
351 struct GNUNET_HashCode global_id; 258 struct GNUNET_HashCode global_id;
352 259
353 /** 260 /**
354 * Local client in this consensus session. 261 * The server's client and associated local state
355 * There is only one client per consensus session.
356 */
357 struct GNUNET_SERVER_Client *client;
358
359 /**
360 * Elements in the consensus set of this session,
361 * all of them either have been sent by or approved by the client.
362 * Contains ElementList.
363 * Used as a unique-key hashmap.
364 */ 262 */
365 struct GNUNET_CONTAINER_MultiHashMap *values; 263 struct ServerClientSocketState scss;
366
367 /**
368 * Elements that have not been approved (or rejected) by the client yet.
369 */
370 struct PendingElement *client_approval_head;
371 264
372 /** 265 /**
373 * Elements that have not been approved (or rejected) by the client yet. 266 * Queued messages to the client.
374 */ 267 */
375 struct PendingElement *client_approval_tail; 268 struct MessageQueue *client_mq;
376 269
377 /** 270 /**
378 * Messages to be sent to the local client that owns this session 271 * IBF_Key -> 2^(HashCode*)
272 * FIXME:
273 * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *.
379 */ 274 */
380 struct QueuedMessage *client_messages_head; 275 struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map;
381 276
382 /** 277 /**
383 * Messages to be sent to the local client that owns this session 278 * Maps HashCodes to ElementInfos
384 */ 279 */
385 struct QueuedMessage *client_messages_tail; 280 struct GNUNET_CONTAINER_MultiHashMap *values;
386 281
387 /** 282 /**
388 * Currently active transmit handle for sending to the client 283 * Currently active transmit handle for sending to the client
@@ -412,9 +307,14 @@ struct ConsensusSession
412 struct ConsensusPeerInformation *info; 307 struct ConsensusPeerInformation *info;
413 308
414 /** 309 /**
310 * GNUNET_YES if the client has called conclude.
311 * */
312 int conclude;
313
314 /**
415 * Index of the local peer in the peers array 315 * Index of the local peer in the peers array
416 */ 316 */
417 int local_peer_idx; 317 unsigned int local_peer_idx;
418 318
419 /** 319 /**
420 * Strata estimator, computed online 320 * Strata estimator, computed online
@@ -431,16 +331,16 @@ struct ConsensusSession
431 */ 331 */
432 enum ConsensusRound current_round; 332 enum ConsensusRound current_round;
433 333
434 int exp_round;
435
436 int exp_subround;
437
438 /** 334 /**
439 * Permutation of peers for the current round, 335 * Permutation of peers for the current round,
440 * maps logical index (for current round) to physical index (location in info array) 336 * maps logical index (for current round) to physical index (location in info array)
441 */ 337 */
442 int *shuffle; 338 int *shuffle;
443 339
340 int exp_round;
341
342 int exp_subround;
343
444 /** 344 /**
445 * The partner for the current exp-round 345 * The partner for the current exp-round
446 */ 346 */
@@ -454,41 +354,121 @@ struct ConsensusSession
454 354
455 355
456/** 356/**
457 * Sockets from other peers who want to communicate with us. 357 * Information about a peer that is in a consensus session.
458 * It may not be known yet which consensus session they belong to.
459 * Also, the session might not exist yet locally.
460 */ 358 */
461struct IncomingSocket 359struct ConsensusPeerInformation
462{ 360{
463 /** 361 /**
464 * Incoming sockets are kept in a double linked list. 362 * Peer identitty of the peer in the consensus session
465 */ 363 */
466 struct IncomingSocket *next; 364 struct GNUNET_PeerIdentity peer_id;
467 365
468 /** 366 /**
469 * Incoming sockets are kept in a double linked list. 367 * Do we connect to the peer, or does the peer connect to us?
368 * Only valid for all-to-all phases
470 */ 369 */
471 struct IncomingSocket *prev; 370 int is_outgoing;
472 371
473 /** 372 /**
474 * The actual socket. 373 * Did we receive/send a consensus hello?
475 */ 374 */
476 struct GNUNET_STREAM_Socket *socket; 375 int hello;
376
377 /*
378 * FIXME
379 */
380 struct MessageStreamState mss;
477 381
478 /** 382 /**
479 * Handle for currently active read 383 * Current state
480 */ 384 */
481 struct GNUNET_STREAM_ReadHandle *rh; 385 enum ConsensusIBFState ibf_state;
482 386
483 /** 387 /**
484 * Peer that connected to us with the socket. 388 * What is the order (=log2 size) of the ibf
389 * we're currently dealing with?
390 * Interpretation depends on ibf_state.
485 */ 391 */
486 struct GNUNET_PeerIdentity peer_id; 392 int ibf_order;
393
394 /**
395 * The current IBF for this peer,
396 * purpose dependent on ibf_state
397 */
398 struct InvertibleBloomFilter *ibf;
487 399
488 /** 400 /**
489 * Message stream tokenizer for this socket. 401 * How many buckets have we transmitted/received?
402 * Interpretatin depends on ibf_state
490 */ 403 */
491 struct GNUNET_SERVER_MessageStreamTokenizer *mst; 404 int ibf_bucket_counter;
405
406 /**
407 * Strata estimator of the peer, NULL if our peer
408 * initiated the reconciliation.
409 */
410 struct StrataEstimator *se;
411
412 /**
413 * Back-reference to the consensus session,
414 * to that ConsensusPeerInformation can be used as a closure
415 */
416 struct ConsensusSession *session;
417
418 /**
419 * True if we are actually replaying the strata message,
420 * e.g. currently handling the premature_strata_message.
421 */
422 int replaying_strata_message;
423
424 /**
425 * A strata message that is not actually for the current round,
426 * used in the exp-scheme.
427 */
428 struct StrataMessage *premature_strata_message;
429
430 /**
431 * We have finishes the exp-subround with the peer.
432 */
433 int exp_subround_finished;
434
435 /**
436 * GNUNET_YES if we synced inventory with this peer;
437 * GNUNET_NO otherwise.
438 */
439 int inventory_synced;
440
441 /**
442 * Round this peer seems to be in, according to the last SE we got.
443 * Necessary to store this, as we sometimes need to respond to a request from an
444 * older round, while we are already in the next round.
445 */
446 enum ConsensusRound apparent_round;
447};
448
449
450/**
451 * Sockets from other peers who want to communicate with us.
452 * It may not be known yet which consensus session they belong to, we have to wait for the
453 * peer's hello.
454 * Also, the session might not exist yet locally, we have to wait for a local client to connect.
455 */
456struct IncomingSocket
457{
458 /**
459 * Incoming sockets are kept in a double linked list.
460 */
461 struct IncomingSocket *next;
462
463 /**
464 * Incoming sockets are kept in a double linked list.
465 */
466 struct IncomingSocket *prev;
467
468 /**
469 * Peer that connected to us with the socket.
470 */
471 struct GNUNET_PeerIdentity peer_id;
492 472
493 /** 473 /**
494 * Peer-in-session this socket belongs to, once known, otherwise NULL. 474 * Peer-in-session this socket belongs to, once known, otherwise NULL.
@@ -500,19 +480,35 @@ struct IncomingSocket
500 * but the session does not exist yet. 480 * but the session does not exist yet.
501 */ 481 */
502 struct GNUNET_HashCode *requested_gid; 482 struct GNUNET_HashCode *requested_gid;
483
484 /*
485 * Timeout, will disconnect the socket if not yet in a session.
486 * FIXME: implement
487 */
488 GNUNET_SCHEDULER_TaskIdentifier timeout;
489
490 /* FIXME */
491 struct MessageStreamState mss;
503}; 492};
504 493
505 494
495/**
496 * Linked list of incoming sockets.
497 */
506static struct IncomingSocket *incoming_sockets_head; 498static struct IncomingSocket *incoming_sockets_head;
499
500/**
501 * Linked list of incoming sockets.
502 */
507static struct IncomingSocket *incoming_sockets_tail; 503static struct IncomingSocket *incoming_sockets_tail;
508 504
509/** 505/**
510 * Linked list of sesstions this peer participates in. 506 * Linked list of sessions this peer participates in.
511 */ 507 */
512static struct ConsensusSession *sessions_head; 508static struct ConsensusSession *sessions_head;
513 509
514/** 510/**
515 * Linked list of sesstions this peer participates in. 511 * Linked list of sessions this peer participates in.
516 */ 512 */
517static struct ConsensusSession *sessions_tail; 513static struct ConsensusSession *sessions_tail;
518 514
@@ -543,151 +539,159 @@ static struct GNUNET_STREAM_ListenSocket *listener;
543 539
544 540
545/** 541/**
546 * Queue a message to be sent to the inhabiting client of a session. 542 * Transmit a queued message to the session's client.
547 * 543 *
548 * @param session session 544 * @param cls consensus session
549 * @param msg message we want to queue 545 * @param size number of bytes available in buf
546 * @param buf where the callee should write the message
547 * @return number of bytes written to buf
550 */ 548 */
551static void 549static size_t
552queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) 550transmit_queued (void *cls, size_t size,
551 void *buf)
553{ 552{
554 struct QueuedMessage *qm; 553 struct MessageQueue *mq = cls;
555 qm = GNUNET_malloc (sizeof *qm); 554 struct PendingMessage *pm = mq->pending_head;
556 qm->msg = msg; 555 struct ServerClientSocketState *state = mq->state;
557 GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); 556 size_t msg_size;
557
558 GNUNET_assert (NULL != pm);
559 GNUNET_assert (NULL != buf);
560 msg_size = ntohs (pm->msg->size);
561 GNUNET_assert (size >= msg_size);
562 memcpy (buf, pm->msg, msg_size);
563 GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
564 state->th = NULL;
565 client_send_next (cls);
566 GNUNET_free (pm);
567 return msg_size;
558} 568}
559 569
560/** 570
561 * Queue a message to be sent to another peer
562 *
563 * @param cpi peer
564 * @param msg message we want to queue
565 * @param cb callback, called when the message is given to strem
566 * @param cls closure for cb
567 */
568static void 571static void
569queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) 572client_send_next (struct MessageQueue *mq)
570{ 573{
571 struct QueuedMessage *qm; 574 struct ServerClientSocketState *state = mq->state;
572 qm = GNUNET_malloc (sizeof *qm); 575 int msize;
573 qm->msg = msg; 576
574 qm->cls = cls; 577 GNUNET_assert (NULL != state);
575 qm->cb = cb; 578
576 GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm); 579 if ( (NULL != state->th) ||
577 if (cpi->wh == NULL) 580 (NULL == mq->pending_head) )
578 write_queued (cpi, GNUNET_STREAM_OK, 0); 581 return;
582 msize = ntohs (mq->pending_head->msg->size);
583 state->th =
584 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
585 GNUNET_TIME_UNIT_FOREVER_REL,
586 &transmit_queued, mq);
587}
588
589
590struct MessageQueue *
591create_message_queue_for_server_client (struct ServerClientSocketState *scss)
592{
593 struct MessageQueue *mq;
594 mq = GNUNET_new (struct MessageQueue);
595 mq->add_cb = client_send_next;
596 mq->state = scss;
597 return mq;
579} 598}
580 599
581 600
582/** 601/**
583 * Queue a message to be sent to another peer 602 * Functions of this signature are called whenever writing operations
603 * on a stream are executed
584 * 604 *
585 * @param cpi peer 605 * @param cls the closure from GNUNET_STREAM_write
586 * @param msg message we want to queue 606 * @param status the status of the stream at the time this function is called;
607 * GNUNET_STREAM_OK if writing to stream was completed successfully;
608 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
609 * (this doesn't mean that the data is never sent, the receiver may
610 * have read the data but its ACKs may have been lost);
611 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
612 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
613 * be processed.
614 * @param size the number of bytes written
587 */ 615 */
588static void 616static void
589queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg) 617write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
590{ 618{
591 queue_peer_message_with_cls (cpi, msg, NULL, NULL); 619 struct MessageQueue *mq = cls;
620 struct MessageStreamState *mss = mq->state;
621 struct PendingMessage *pm;
622
623 GNUNET_assert (GNUNET_STREAM_OK == status);
624
625 /* call cb for message we finished sending */
626 pm = mq->current_pm;
627 if (NULL != pm)
628 {
629 if (NULL != pm->sent_cb)
630 pm->sent_cb (pm->sent_cb_cls);
631 GNUNET_free (pm);
632 }
633
634 mss->wh = NULL;
635
636 pm = mq->pending_head;
637 mq->current_pm = pm;
638 if (NULL == pm)
639 return;
640 GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
641 mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size),
642 GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls);
643 GNUNET_assert (NULL != mss->wh);
592} 644}
593 645
594 646
595/*
596static void 647static void
597clear_peer_messages (struct ConsensusPeerInformation *cpi) 648stream_socket_add_cb (struct MessageQueue *mq)
598{ 649{
599 cpi->messages_head = NULL; 650 if (NULL != mq->current_pm)
600 cpi->messages_tail = NULL; 651 return;
652 write_queued (mq, GNUNET_STREAM_OK, 0);
601} 653}
602*/
603 654
604 655
605/** 656struct MessageQueue *
606 * Estimate set difference with two strata estimators, 657create_message_queue_for_stream_socket (struct MessageStreamState *mss)
607 * i.e. arrays of IBFs.
608 * Does not not modify its arguments.
609 *
610 * @param se1 first strata estimator
611 * @param se2 second strata estimator
612 * @return the estimated difference
613 */
614static int
615estimate_difference (const struct StrataEstimator *se1,
616 const struct StrataEstimator *se2)
617{ 658{
618 int i; 659 struct MessageQueue *mq;
619 int count; 660 mq = GNUNET_new (struct MessageQueue);
620 count = 0; 661 mq->state = mss;
621 for (i = STRATA_COUNT - 1; i >= 0; i--) 662 mq->add_cb = stream_socket_add_cb;
622 { 663 return mq;
623 struct InvertibleBloomFilter *diff; 664}
624 /* number of keys decoded from the ibf */ 665
625 int ibf_count; 666
626 int more; 667struct PendingMessage *
627 ibf_count = 0; 668new_pending_message (uint16_t size, uint16_t type)
628 /* FIXME: implement this without always allocating new IBFs */ 669{
629 diff = ibf_dup (se1->strata[i]); 670 struct PendingMessage *pm;
630 ibf_subtract (diff, se2->strata[i]); 671 pm = GNUNET_malloc (sizeof *pm + size);
631 for (;;) 672 pm->msg = (void *) &pm[1];
632 { 673 pm->msg->size = htons (size);
633 more = ibf_decode (diff, NULL, NULL); 674 pm->msg->type = htons (type);
634 if (GNUNET_NO == more) 675 return pm;
635 {
636 count += ibf_count;
637 break;
638 }
639 if (GNUNET_SYSERR == more)
640 {
641 ibf_destroy (diff);
642 return count * (1 << (i + 1));
643 }
644 ibf_count++;
645 }
646 ibf_destroy (diff);
647 }
648 return count;
649} 676}
650 677
651 678
652/** 679/**
653 * Called when receiving data from a peer that is member of 680 * Queue a message in a message queue.
654 * an inhabited consensus session.
655 * 681 *
656 * @param cls the closure from GNUNET_STREAM_read 682 * @param queue the message queue
657 * @param status the status of the stream at the time this function is called 683 * @param pending message, message with additional information
658 * @param data traffic from the other side
659 * @param size the number of bytes available in data read; will be 0 on timeout
660 * @return number of bytes of processed from 'data' (any data remaining should be
661 * given to the next time the read processor is called).
662 */ 684 */
663static size_t 685void
664session_stream_data_processor (void *cls, 686message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg)
665 enum GNUNET_STREAM_Status status,
666 const void *data,
667 size_t size)
668{ 687{
669 struct ConsensusPeerInformation *cpi; 688 GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg);
670 int ret; 689 queue->add_cb (queue);
671
672 GNUNET_assert (GNUNET_STREAM_OK == status);
673 cpi = cls;
674 GNUNET_assert (NULL != cpi->mst);
675 ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
676 if (GNUNET_SYSERR == ret)
677 {
678 /* FIXME: handle this correctly */
679 GNUNET_assert (0);
680 }
681 /* read again */
682 cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
683 &session_stream_data_processor, cpi);
684 /* we always read all data */
685 return size;
686} 690}
687 691
692
688/** 693/**
689 * Called when we receive data from a peer that is not member of 694 * Called when we receive data from a peer via stream.
690 * a session yet, or the session is not yet inhabited.
691 * 695 *
692 * @param cls the closure from GNUNET_STREAM_read 696 * @param cls the closure from GNUNET_STREAM_read
693 * @param status the status of the stream at the time this function is called 697 * @param status the status of the stream at the time this function is called
@@ -697,62 +701,66 @@ session_stream_data_processor (void *cls,
697 * given to the next time the read processor is called). 701 * given to the next time the read processor is called).
698 */ 702 */
699static size_t 703static size_t
700incoming_stream_data_processor (void *cls, 704stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size)
701 enum GNUNET_STREAM_Status status,
702 const void *data,
703 size_t size)
704{ 705{
705 struct IncomingSocket *incoming; 706 struct MessageStreamState *mss = cls;
706 int ret; 707 int ret;
707 708
708 GNUNET_assert (GNUNET_STREAM_OK == status); 709 mss->rh = NULL;
709 incoming = cls; 710
710 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); 711 if (GNUNET_STREAM_OK != status)
712 {
713 /* FIXME: handle this correctly */
714 GNUNET_break (0);
715 return 0;
716 }
717 GNUNET_assert (NULL != mss->mst);
718 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES);
711 if (GNUNET_SYSERR == ret) 719 if (GNUNET_SYSERR == ret)
712 { 720 {
713 /* FIXME: handle this correctly */ 721 /* FIXME: handle this correctly */
714 GNUNET_assert (0); 722 GNUNET_break (0);
723 return 0;
715 } 724 }
716 /* read again */ 725 /* read again */
717 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, 726 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss);
718 &incoming_stream_data_processor, incoming);
719 /* we always read all data */ 727 /* we always read all data */
720 return size; 728 return size;
721} 729}
722 730
723 731
732/**
733 * Send element or element report to the peer specified in cpi.
734 *
735 * @param cpi peer to send the elements to
736 * @param head head of the element list
737 */
724static void 738static void
725send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head) 739send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e)
726{ 740{
727 struct GNUNET_CONSENSUS_Element *element; 741 struct PendingMessage *pm;
728 struct GNUNET_MessageHeader *element_msg;
729 size_t msize;
730 742
731 while (NULL != head) 743 switch (cpi->apparent_round)
732 { 744 {
733 element = head->element; 745 case CONSENSUS_ROUND_COMPLETION:
734 msize = sizeof (struct GNUNET_MessageHeader) + element->size; 746 case CONSENSUS_ROUND_EXCHANGE:
735 element_msg = GNUNET_malloc (msize); 747 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size,
736 element_msg->size = htons (msize); 748 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
737 switch (cpi->apparent_round) 749 memcpy (&pm->msg[1], e->element->data, e->element->size);
738 { 750 message_queue_add (cpi->mss.mq, pm);
739 case CONSENSUS_ROUND_STOCK: 751 break;
740 case CONSENSUS_ROUND_EXCHANGE: 752 case CONSENSUS_ROUND_INVENTORY:
741 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); 753 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode),
742 break; 754 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
743 case CONSENSUS_ROUND_INVENTORY: 755 memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode));
744 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); 756 message_queue_add (cpi->mss.mq, pm);
745 break; 757 break;
746 default: 758 default:
747 GNUNET_break (0); 759 GNUNET_break (0);
748 }
749 GNUNET_assert (NULL != element->data);
750 memcpy (&element_msg[1], element->data, element->size);
751 queue_peer_message (cpi, element_msg);
752 head = head->next;
753 } 760 }
754} 761}
755 762
763
756/** 764/**
757 * Iterator to insert values into an ibf. 765 * Iterator to insert values into an ibf.
758 * 766 *
@@ -768,12 +776,10 @@ ibf_values_iterator (void *cls,
768 const struct GNUNET_HashCode *key, 776 const struct GNUNET_HashCode *key,
769 void *value) 777 void *value)
770{ 778{
771 struct ConsensusPeerInformation *cpi; 779 struct ConsensusPeerInformation *cpi = cls;
772 struct ElementList *head; 780 struct ElementInfo *e = value;
773 struct IBF_Key ibf_key; 781 struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash);
774 cpi = cls; 782
775 head = value;
776 ibf_key = ibf_key_from_hashcode (head->element_hash);
777 GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); 783 GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
778 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); 784 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
779 return GNUNET_YES; 785 return GNUNET_YES;
@@ -788,11 +794,10 @@ ibf_values_iterator (void *cls,
788static void 794static void
789prepare_ibf (struct ConsensusPeerInformation *cpi) 795prepare_ibf (struct ConsensusPeerInformation *cpi)
790{ 796{
791 if (NULL == cpi->session->ibfs[cpi->ibf_order]) 797 if (NULL != cpi->session->ibfs[cpi->ibf_order])
792 { 798 return;
793 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); 799 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
794 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); 800 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
795 }
796} 801}
797 802
798 803
@@ -816,15 +821,18 @@ exp_subround_finished (const struct ConsensusSession *session)
816{ 821{
817 int not_finished; 822 int not_finished;
818 not_finished = 0; 823 not_finished = 0;
819 if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO)) 824 if ( (NULL != session->partner_outgoing) &&
820 not_finished++; 825 (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
821 if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO)) 826 not_finished++;
822 not_finished++; 827 if ( (NULL != session->partner_incoming) &&
828 (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
829 not_finished++;
823 if (0 == not_finished) 830 if (0 == not_finished)
824 return GNUNET_YES; 831 return GNUNET_YES;
825 return GNUNET_NO; 832 return GNUNET_NO;
826} 833}
827 834
835
828static int 836static int
829inventory_round_finished (struct ConsensusSession *session) 837inventory_round_finished (struct ConsensusSession *session)
830{ 838{
@@ -840,153 +848,170 @@ inventory_round_finished (struct ConsensusSession *session)
840} 848}
841 849
842 850
843
844static void 851static void
845fin_sent_cb (void *cls) 852clear_message_stream_state (struct MessageStreamState *mss)
846{ 853{
847 struct ConsensusPeerInformation *cpi; 854 if (NULL != mss->mst)
848 cpi = cls;
849 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
850 switch (cpi->session->current_round)
851 { 855 {
852 case CONSENSUS_ROUND_EXCHANGE: 856 GNUNET_SERVER_mst_destroy (mss->mst);
853 case CONSENSUS_ROUND_STOCK: 857 mss->mst = NULL;
854 if (cpi->session->current_round != cpi->apparent_round) 858 }
855 { 859 if (NULL != mss->rh)
856 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); 860 {
857 break; 861 GNUNET_STREAM_read_cancel (mss->rh);
858 } 862 mss->rh = NULL;
859 cpi->exp_subround_finished = GNUNET_YES; 863 }
860 /* the subround is only really over if *both* partners are done */ 864 if (NULL != mss->wh)
861 if (GNUNET_YES == exp_subround_finished (cpi->session)) 865 {
862 subround_over (cpi->session, NULL); 866 GNUNET_STREAM_write_cancel (mss->wh);
863 else 867 mss->wh = NULL;
864 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); 868 }
865 break; 869 if (NULL != mss->socket)
866 case CONSENSUS_ROUND_INVENTORY: 870 {
867 cpi->inventory_synced = GNUNET_YES; 871 GNUNET_STREAM_close (mss->socket);
868 if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) 872 mss->socket = NULL;
869 round_over (cpi->session, NULL); 873 }
870 /* FIXME: maybe go to next round */ 874 if (NULL != mss->mq)
871 break; 875 {
872 default: 876 GNUNET_free (mss->mq);
873 GNUNET_break (0); 877 mss->mq = NULL;
874 } 878 }
875} 879}
876 880
877 881
878/** 882/**
879 * Gets called when the other peer wants us to inform that 883 * Iterator over hash map entries.
880 * it has decoded our ibf and sent us all elements / requests 884 *
885 * @param cls closure
886 * @param key current key code
887 * @param value value in the hash map
888 * @return GNUNET_YES if we should continue to
889 * iterate,
890 * GNUNET_NO if not.
881 */ 891 */
882static int 892static int
883handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) 893destroy_element_info_iter (void *cls,
894 const struct GNUNET_HashCode * key,
895 void *value)
884{ 896{
885 struct ConsensusRoundMessage *fin_msg; 897 struct ElementInfo *ei = value;
886 898 GNUNET_free (ei->element);
887 switch (cpi->session->current_round) 899 GNUNET_free (ei->element_hash);
888 { 900 GNUNET_free (ei);
889 case CONSENSUS_ROUND_INVENTORY:
890 cpi->inventory_synced = GNUNET_YES;
891 case CONSENSUS_ROUND_STOCK:
892 case CONSENSUS_ROUND_EXCHANGE:
893 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
894 fin_msg = GNUNET_malloc (sizeof *fin_msg);
895 fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
896 fin_msg->header.size = htons (sizeof *fin_msg);
897 fin_msg->round = cpi->apparent_round;
898 /* the subround os over once we kicked off sending the fin msg */
899 /* FIXME: assert we are talking to the right peer! */
900 queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi);
901 /* FIXME: mark peer as synced */
902 break;
903 default:
904 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
905 break;
906 }
907 return GNUNET_YES; 901 return GNUNET_YES;
908} 902}
909 903
910 904
911/** 905/**
912 * The other peer wants us to inform that he sent us all the elements we requested. 906 * Destroy a session, free all resources associated with it.
907 *
908 * @param session the session to destroy
913 */ 909 */
914static int 910static void
915handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) 911destroy_session (struct ConsensusSession *session)
916{ 912{
917 struct ConsensusRoundMessage *round_msg; 913 int i;
918 round_msg = (struct ConsensusRoundMessage *) msg; 914
919 /* FIXME: only call subround_over if round is the current one! */ 915 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
920 switch (cpi->session->current_round) 916 GNUNET_SERVER_client_drop (session->scss.client);
917 session->scss.client = NULL;
918 if (NULL != session->client_mq)
921 { 919 {
922 case CONSENSUS_ROUND_EXCHANGE: 920 GNUNET_free (session->client_mq);
923 case CONSENSUS_ROUND_STOCK: 921 session->client_mq = NULL;
924 if (cpi->session->current_round != round_msg->round) 922 }
923 if (NULL != session->shuffle)
924 {
925 GNUNET_free (session->shuffle);
926 session->shuffle = NULL;
927 }
928 if (NULL != session->se)
929 {
930 strata_estimator_destroy (session->se);
931 session->se = NULL;
932 }
933 if (NULL != session->info)
934 {
935 for (i = 0; i < session->num_peers; i++)
936 {
937 struct ConsensusPeerInformation *cpi;
938 cpi = &session->info[i];
939 clear_message_stream_state (&cpi->mss);
940 if (NULL != cpi->se)
925 { 941 {
926 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 942 strata_estimator_destroy (cpi->se);
927 cpi->ibf_state = IBF_STATE_NONE; 943 cpi->se = NULL;
928 cpi->ibf_bucket_counter = 0;
929 break;
930 } 944 }
931 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 945 if (NULL != cpi->ibf)
932 cpi->exp_subround_finished = GNUNET_YES; 946 {
933 if (GNUNET_YES == exp_subround_finished (cpi->session)) 947 ibf_destroy (cpi->ibf);
934 subround_over (cpi->session, NULL); 948 cpi->ibf = NULL;
935 else 949 }
936 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); 950 }
937 break; 951 GNUNET_free (session->info);
938 case CONSENSUS_ROUND_INVENTORY: 952 session->info = NULL;
939 cpi->inventory_synced = GNUNET_YES; 953 }
940 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 954 if (NULL != session->ibfs)
941 if (inventory_round_finished (cpi->session)) 955 {
942 round_over (cpi->session, NULL); 956 for (i = 0; i <= MAX_IBF_ORDER; i++)
943 break; 957 {
944 default: 958 if (NULL != session->ibfs[i])
945 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); 959 {
946 break; 960 ibf_destroy (session->ibfs[i]);
961 session->ibfs[i] = NULL;
962 }
963 }
964 GNUNET_free (session->ibfs);
965 session->ibfs = NULL;
966 }
967 if (NULL != session->values)
968 {
969 GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL);
970 GNUNET_CONTAINER_multihashmap_destroy (session->values);
971 session->values = NULL;
947 } 972 }
948 return GNUNET_YES;
949}
950
951
952static struct StrataEstimator *
953strata_estimator_create ()
954{
955 struct StrataEstimator *se;
956 int i;
957
958 /* fixme: allocate everything in one chunk */
959
960 se = GNUNET_malloc (sizeof (struct StrataEstimator));
961 se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT);
962 for (i = 0; i < STRATA_COUNT; i++)
963 se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
964 973
965 return se; 974 if (NULL != session->ibf_key_map)
975 {
976 GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map);
977 session->ibf_key_map = NULL;
978 }
979 GNUNET_free (session);
966} 980}
967 981
982
968static void 983static void
969strata_estimator_destroy (struct StrataEstimator *se) 984send_client_conclude_done (struct ConsensusSession *session)
970{ 985{
971 int i; 986 struct PendingMessage *pm;
972 for (i = 0; i < STRATA_COUNT; i++) 987
973 ibf_destroy (se->strata[i]); 988 /* check if client is even there anymore */
974 GNUNET_free (se->strata); 989 if (NULL == session->scss.client)
975 GNUNET_free (se); 990 return;
991 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader),
992 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
993 message_queue_add (session->client_mq, pm);
976} 994}
977 995
978 996
997/**
998 * Check if a strata message is for the current round or not
999 *
1000 * @param session session we are in
1001 * @param strata_msg the strata message to check
1002 * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise
1003 */
979static int 1004static int
980is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) 1005is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
981{ 1006{
982 switch (strata_msg->round) 1007 switch (strata_msg->round)
983 { 1008 {
984 case CONSENSUS_ROUND_STOCK: 1009 case CONSENSUS_ROUND_COMPLETION:
985 case CONSENSUS_ROUND_EXCHANGE: 1010 case CONSENSUS_ROUND_EXCHANGE:
986 /* here, we also have to compare subrounds */ 1011 /* here, we also have to compare subrounds */
987 if ( (strata_msg->round != session->current_round) || 1012 if ( (strata_msg->round != session->current_round) ||
988 (strata_msg->exp_round != session->exp_round) || 1013 (strata_msg->exp_round != session->exp_round) ||
989 (strata_msg->exp_subround != session->exp_subround)) 1014 (strata_msg->exp_subround != session->exp_subround) )
990 return GNUNET_YES; 1015 return GNUNET_YES;
991 break; 1016 break;
992 default: 1017 default:
@@ -999,6 +1024,72 @@ is_premature_strata_message (const struct ConsensusSession *session, const struc
999 1024
1000 1025
1001/** 1026/**
1027 * Send a strata estimator.
1028 *
1029 * @param cpi the peer
1030 */
1031static void
1032send_strata_estimator (struct ConsensusPeerInformation *cpi)
1033{
1034 struct PendingMessage *pm;
1035 struct StrataMessage *strata_msg;
1036
1037 /* FIXME: why is this correct? */
1038 cpi->apparent_round = cpi->session->current_round;
1039 cpi->ibf_state = IBF_STATE_NONE;
1040 cpi->ibf_bucket_counter = 0;
1041
1042 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round);
1043
1044 pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE),
1045 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1046 strata_msg = (struct StrataMessage *) pm->msg;
1047 strata_msg->round = cpi->session->current_round;
1048 strata_msg->exp_round = cpi->session->exp_round;
1049 strata_msg->exp_subround = cpi->session->exp_subround;
1050 strata_estimator_write (cpi->session->se, &strata_msg[1]);
1051 message_queue_add (cpi->mss.mq, pm);
1052}
1053
1054
1055/**
1056 * Send an IBF of the order specified in cpi.
1057 *
1058 * @param cpi the peer
1059 */
1060static void
1061send_ibf (struct ConsensusPeerInformation *cpi)
1062{
1063 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
1064 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1065
1066 cpi->ibf_bucket_counter = 0;
1067 while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
1068 {
1069 unsigned int num_buckets;
1070 struct PendingMessage *pm;
1071 struct DifferenceDigest *digest;
1072
1073 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1074 /* limit to maximum */
1075 if (num_buckets > BUCKETS_PER_MESSAGE)
1076 num_buckets = BUCKETS_PER_MESSAGE;
1077
1078 pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE),
1079 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1080 digest = (struct DifferenceDigest *) pm->msg;
1081 digest->order = cpi->ibf_order;
1082 digest->round = cpi->apparent_round;
1083 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]);
1084 cpi->ibf_bucket_counter += num_buckets;
1085 message_queue_add (cpi->mss.mq, pm);
1086 }
1087 cpi->ibf_bucket_counter = 0;
1088 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1089}
1090
1091
1092/**
1002 * Called when a peer sends us its strata estimator. 1093 * Called when a peer sends us its strata estimator.
1003 * In response, we sent out IBF of appropriate size back. 1094 * In response, we sent out IBF of appropriate size back.
1004 * 1095 *
@@ -1008,12 +1099,10 @@ is_premature_strata_message (const struct ConsensusSession *session, const struc
1008static int 1099static int
1009handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) 1100handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
1010{ 1101{
1011 int i; // unsigned?
1012 unsigned int diff; 1102 unsigned int diff;
1013 void *buf;
1014 size_t size;
1015 1103
1016 if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY)) 1104 if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) &&
1105 (strata_msg->round == CONSENSUS_ROUND_INVENTORY) )
1017 { 1106 {
1018 /* we still have to handle this request appropriately */ 1107 /* we still have to handle this request appropriately */
1019 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", 1108 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
@@ -1023,28 +1112,26 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
1023 { 1112 {
1024 if (GNUNET_NO == cpi->replaying_strata_message) 1113 if (GNUNET_NO == cpi->replaying_strata_message)
1025 { 1114 {
1026 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n", 1115 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n",
1027 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround); 1116 strata_msg->exp_round, strata_msg->exp_subround);
1028 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); 1117 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header);
1029 } 1118 }
1030 return GNUNET_YES; 1119 return GNUNET_YES;
1031 } 1120 }
1032 1121
1033 if (NULL == cpi->se) 1122 if (NULL == cpi->se)
1034 cpi->se = strata_estimator_create (); 1123 cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
1035 1124
1036 cpi->apparent_round = strata_msg->round; 1125 cpi->apparent_round = strata_msg->round;
1037 1126
1038 size = ntohs (strata_msg->header.size); 1127 if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
1039 buf = (void *) &strata_msg[1]; // FIXME: do NOT cast away 'const'!
1040 for (i = 0; i < STRATA_COUNT; i++)
1041 { 1128 {
1042 int res; 1129 LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n");
1043 res = ibf_read (&buf, &size, cpi->se->strata[i]); 1130 return GNUNET_NO;
1044 GNUNET_assert (GNUNET_OK == res);
1045 } 1131 }
1046 1132 strata_estimator_read (&strata_msg[1], cpi->se);
1047 diff = estimate_difference (cpi->session->se, cpi->se); 1133 GNUNET_assert (NULL != cpi->session->se);
1134 diff = strata_estimator_difference (cpi->session->se, cpi->se);
1048 1135
1049 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", 1136 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
1050 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); 1137 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
@@ -1053,10 +1140,10 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
1053 { 1140 {
1054 case CONSENSUS_ROUND_EXCHANGE: 1141 case CONSENSUS_ROUND_EXCHANGE:
1055 case CONSENSUS_ROUND_INVENTORY: 1142 case CONSENSUS_ROUND_INVENTORY:
1056 case CONSENSUS_ROUND_STOCK: 1143 case CONSENSUS_ROUND_COMPLETION:
1057 /* send IBF of the right size */ 1144 /* send IBF of the right size */
1058 cpi->ibf_order = 0; 1145 cpi->ibf_order = 0;
1059 while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) ) 1146 while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) )
1060 cpi->ibf_order++; 1147 cpi->ibf_order++;
1061 if (cpi->ibf_order > MAX_IBF_ORDER) 1148 if (cpi->ibf_order > MAX_IBF_ORDER)
1062 cpi->ibf_order = MAX_IBF_ORDER; 1149 cpi->ibf_order = MAX_IBF_ORDER;
@@ -1066,7 +1153,6 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
1066 if (NULL != cpi->ibf) 1153 if (NULL != cpi->ibf)
1067 ibf_destroy (cpi->ibf); 1154 ibf_destroy (cpi->ibf);
1068 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); 1155 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1069 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1070 cpi->ibf_bucket_counter = 0; 1156 cpi->ibf_bucket_counter = 0;
1071 send_ibf (cpi); 1157 send_ibf (cpi);
1072 break; 1158 break;
@@ -1079,11 +1165,104 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
1079} 1165}
1080 1166
1081 1167
1168
1169static int
1170send_elements_iterator (void *cls,
1171 const struct GNUNET_HashCode * key,
1172 void *value)
1173{
1174 struct ConsensusPeerInformation *cpi = cls;
1175 struct ElementInfo *ei;
1176 ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value);
1177 if (NULL == ei)
1178 {
1179 LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n",
1180 GNUNET_h2s((struct GNUNET_HashCode *) value));
1181 return GNUNET_YES;
1182 }
1183 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n");
1184 send_element_or_report (cpi, ei);
1185 return GNUNET_YES;
1186}
1187
1188
1189/**
1190 * Decode the current diff ibf, and send elements/requests/reports/
1191 *
1192 * @param cpi partner peer
1193 */
1194static void
1195decode (struct ConsensusPeerInformation *cpi)
1196{
1197 struct IBF_Key key;
1198 int side;
1199
1200 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1201
1202 while (1)
1203 {
1204 int res;
1205
1206 res = ibf_decode (cpi->ibf, &side, &key);
1207 if (GNUNET_SYSERR == res)
1208 {
1209 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1210 /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1211 cpi->ibf_order++;
1212 prepare_ibf (cpi);
1213 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1214 cpi->ibf_bucket_counter = 0;
1215 send_ibf (cpi);
1216 return;
1217 }
1218 if (GNUNET_NO == res)
1219 {
1220 struct PendingMessage *pm;
1221 struct ConsensusRoundMessage *rmsg;
1222 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
1223
1224 pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
1225 rmsg = (struct ConsensusRoundMessage *) pm->msg;
1226 rmsg->round = cpi->apparent_round;
1227 message_queue_add (cpi->mss.mq, pm);
1228 return;
1229 }
1230 if (-1 == side)
1231 {
1232 struct GNUNET_HashCode hashcode;
1233 /* we have the element(s), send it to the other peer */
1234 ibf_hashcode_from_key (key, &hashcode);
1235 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
1236 }
1237 else
1238 {
1239 struct PendingMessage *pm;
1240 uint16_t type;
1241
1242 switch (cpi->apparent_round)
1243 {
1244 case CONSENSUS_ROUND_COMPLETION:
1245 /* FIXME: check if we really want to request the element */
1246 case CONSENSUS_ROUND_EXCHANGE:
1247 case CONSENSUS_ROUND_INVENTORY:
1248 type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST;
1249 break;
1250 default:
1251 GNUNET_assert (0);
1252 }
1253 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key),
1254 type);
1255 *(struct IBF_Key *) &pm->msg[1] = key;
1256 message_queue_add (cpi->mss.mq, pm);
1257 }
1258 }
1259}
1260
1261
1082static int 1262static int
1083handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) 1263handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
1084{ 1264{
1085 int num_buckets; 1265 int num_buckets;
1086 void *buf;
1087 1266
1088 /* FIXME: find out if we're still expecting the same ibf! */ 1267 /* FIXME: find out if we're still expecting the same ibf! */
1089 1268
@@ -1128,13 +1307,10 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
1128 return GNUNET_YES; 1307 return GNUNET_YES;
1129 } 1308 }
1130 1309
1131
1132 if (NULL == cpi->ibf) 1310 if (NULL == cpi->ibf)
1133 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); 1311 cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
1134
1135 buf = (void *) &digest[1]; // FIXME: digest is supposed to be READ ONLY!
1136 ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
1137 1312
1313 ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
1138 cpi->ibf_bucket_counter += num_buckets; 1314 cpi->ibf_bucket_counter += num_buckets;
1139 1315
1140 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) 1316 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
@@ -1150,19 +1326,53 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
1150 1326
1151 1327
1152/** 1328/**
1329 * Insert an element into the consensus set of the specified session.
1330 * The element will not be copied, and freed when destroying the session.
1331 *
1332 * @param session session for new element
1333 * @param element element to insert
1334 */
1335static void
1336insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
1337{
1338 struct GNUNET_HashCode hash;
1339 struct ElementInfo *e;
1340 struct IBF_Key ibf_key;
1341 int i;
1342
1343 e = GNUNET_new (struct ElementInfo);
1344 e->element = element;
1345 e->element_hash = GNUNET_new (struct GNUNET_HashCode);
1346 GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash);
1347 ibf_key = ibf_key_from_hashcode (e->element_hash);
1348 ibf_hashcode_from_key (ibf_key, &hash);
1349 strata_estimator_insert (session->se, &hash);
1350 GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e,
1351 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1352 GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash,
1353 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1354
1355 for (i = 0; i <= MAX_IBF_ORDER; i++)
1356 {
1357 if (NULL == session->ibfs[i])
1358 continue;
1359 ibf_insert (session->ibfs[i], ibf_key);
1360 }
1361}
1362
1363
1364/**
1153 * Handle an element that another peer sent us 1365 * Handle an element that another peer sent us
1154 */ 1366 */
1155static int 1367static int
1156handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) 1368handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
1157{ 1369{
1158 struct PendingElement *pending_element;
1159 struct GNUNET_CONSENSUS_Element *element; 1370 struct GNUNET_CONSENSUS_Element *element;
1160 struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
1161 size_t size; 1371 size_t size;
1162 1372
1163 switch (cpi->session->current_round) 1373 switch (cpi->session->current_round)
1164 { 1374 {
1165 case CONSENSUS_ROUND_STOCK: 1375 case CONSENSUS_ROUND_COMPLETION:
1166 /* FIXME: check if we really expect the element */ 1376 /* FIXME: check if we really expect the element */
1167 case CONSENSUS_ROUND_EXCHANGE: 1377 case CONSENSUS_ROUND_EXCHANGE:
1168 break; 1378 break;
@@ -1178,21 +1388,10 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
1178 memcpy (&element[1], &element_msg[1], size); 1388 memcpy (&element[1], &element_msg[1], size);
1179 element->data = &element[1]; 1389 element->data = &element[1];
1180 1390
1181 pending_element = GNUNET_malloc (sizeof *pending_element); 1391 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n");
1182 pending_element->element = element;
1183 GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element);
1184
1185 client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
1186 client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
1187 client_element_msg->header.size = htons (size + sizeof *client_element_msg);
1188 memcpy (&client_element_msg[1], &element[1], size);
1189 1392
1190 queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); 1393 insert_element (cpi->session, element);
1191 1394
1192 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size);
1193
1194 client_send_next (cpi->session);
1195
1196 return GNUNET_YES; 1395 return GNUNET_YES;
1197} 1396}
1198 1397
@@ -1213,20 +1412,36 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E
1213 /* element requests are allowed in every round */ 1412 /* element requests are allowed in every round */
1214 1413
1215 num = ntohs (msg->header.size) / sizeof (struct IBF_Key); 1414 num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
1216 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num);
1217 1415
1218 ibf_key = (struct IBF_Key *) &msg[1]; 1416 ibf_key = (struct IBF_Key *) &msg[1];
1219 while (num--) 1417 while (num--)
1220 { 1418 {
1221 struct ElementList *head;
1222 ibf_hashcode_from_key (*ibf_key, &hashcode); 1419 ibf_hashcode_from_key (*ibf_key, &hashcode);
1223 head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); 1420 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
1224 send_elements (cpi, head);
1225 ibf_key++; 1421 ibf_key++;
1226 } 1422 }
1227 return GNUNET_YES; 1423 return GNUNET_YES;
1228} 1424}
1229 1425
1426static int
1427is_peer_connected (struct ConsensusPeerInformation *cpi)
1428{
1429 if (NULL == cpi->mss.socket)
1430 return GNUNET_NO;
1431 return GNUNET_YES;
1432}
1433
1434
1435static void
1436ensure_peer_connected (struct ConsensusPeerInformation *cpi)
1437{
1438 if (NULL != cpi->mss.socket)
1439 return;
1440 cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
1441 open_cb, cpi, GNUNET_STREAM_OPTION_END);
1442}
1443
1444
1230/** 1445/**
1231 * If necessary, send a message to the peer, depending on the current 1446 * If necessary, send a message to the peer, depending on the current
1232 * round. 1447 * round.
@@ -1234,17 +1449,22 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E
1234static void 1449static void
1235embrace_peer (struct ConsensusPeerInformation *cpi) 1450embrace_peer (struct ConsensusPeerInformation *cpi)
1236{ 1451{
1237 GNUNET_assert (GNUNET_YES == cpi->hello); 1452 if (GNUNET_NO == is_peer_connected (cpi))
1453 {
1454 ensure_peer_connected (cpi);
1455 return;
1456 }
1457 if (GNUNET_NO == cpi->hello)
1458 return;
1459 /* FIXME: correctness of switch */
1238 switch (cpi->session->current_round) 1460 switch (cpi->session->current_round)
1239 { 1461 {
1240 case CONSENSUS_ROUND_EXCHANGE: 1462 case CONSENSUS_ROUND_EXCHANGE:
1463 case CONSENSUS_ROUND_INVENTORY:
1241 if (cpi->session->partner_outgoing != cpi) 1464 if (cpi->session->partner_outgoing != cpi)
1242 break; 1465 break;
1243 /* fallthrough */ 1466 /* fallthrough */
1244 case CONSENSUS_ROUND_INVENTORY: 1467 case CONSENSUS_ROUND_COMPLETION:
1245 /* fallthrough */
1246 case CONSENSUS_ROUND_STOCK:
1247 if (cpi == cpi->session->partner_outgoing)
1248 send_strata_estimator (cpi); 1468 send_strata_estimator (cpi);
1249 default: 1469 default:
1250 break; 1470 break;
@@ -1253,195 +1473,313 @@ embrace_peer (struct ConsensusPeerInformation *cpi)
1253 1473
1254 1474
1255/** 1475/**
1256 * Handle a HELLO-message, send when another peer wants to join a session where 1476 * Called when stream has finishes writing the hello message
1257 * our peer is a member. The session may or may not be inhabited yet.
1258 */ 1477 */
1259static int 1478static void
1260handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) 1479hello_cont (void *cls)
1261{ 1480{
1262 /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ 1481 struct ConsensusPeerInformation *cpi = cls;
1263 struct ConsensusSession *session;
1264 int idx;
1265 1482
1266 for (session = sessions_head; NULL != session; session = session->next) 1483 cpi->hello = GNUNET_YES;
1267 { 1484 embrace_peer (cpi);
1268 if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
1269 continue;
1270 idx = get_peer_idx (&inc->peer_id, session);
1271 GNUNET_assert (-1 != idx);
1272 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
1273 inc->cpi = &session->info[idx];
1274 inc->cpi->mst = inc->mst;
1275 inc->cpi->hello = GNUNET_YES;
1276 inc->cpi->socket = inc->socket;
1277 embrace_peer (inc->cpi);
1278 return GNUNET_YES;
1279 }
1280 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
1281 return GNUNET_NO;
1282} 1485}
1283 1486
1284 1487
1285/** 1488/**
1286 * Send a strata estimator. 1489 * Called when we established a stream connection to another peer
1287 * 1490 *
1288 * @param cpi the peer 1491 * @param cls cpi of the peer we just connected to
1492 * @param socket socket to use to communicate with the other side (read/write)
1289 */ 1493 */
1290static void 1494static void
1291send_strata_estimator (struct ConsensusPeerInformation *cpi) 1495open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1292{ 1496{
1293 struct StrataMessage *strata_msg; 1497 struct ConsensusPeerInformation *cpi = cls;
1294 void *buf; 1498 struct PendingMessage *pm;
1295 size_t msize; 1499 struct ConsensusHello *hello;
1296 int i;
1297 1500
1298 cpi->apparent_round = cpi->session->current_round; 1501 GNUNET_assert (NULL == cpi->mss.mst);
1299 cpi->ibf_state = IBF_STATE_NONE; 1502 GNUNET_assert (NULL == cpi->mss.mq);
1300 cpi->ibf_bucket_counter = 0;
1301 1503
1302 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n", 1504 cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
1303 cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info)); 1505 cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1506 cpi->mss.mst_cls = cpi;
1304 1507
1305 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); 1508 pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1509 hello = (struct ConsensusHello *) pm->msg;
1510 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1511 pm->sent_cb = hello_cont;
1512 pm->sent_cb_cls = cpi;
1513 message_queue_add (cpi->mss.mq, pm);
1514 cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1515 &stream_data_processor, &cpi->mss);
1516}
1306 1517
1307 strata_msg = GNUNET_malloc (msize); 1518
1308 strata_msg->header.size = htons (msize); 1519static void
1309 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); 1520replay_premature_message (struct ConsensusPeerInformation *cpi)
1310 strata_msg->round = cpi->session->current_round; 1521{
1311 strata_msg->exp_round = cpi->session->exp_round; 1522 if (NULL != cpi->premature_strata_message)
1312 strata_msg->exp_subround = cpi->session->exp_subround;
1313
1314 buf = &strata_msg[1];
1315 for (i = 0; i < STRATA_COUNT; i++)
1316 { 1523 {
1317 ibf_write (cpi->session->se->strata[i], &buf, NULL); 1524 struct StrataMessage *sm;
1318 } 1525
1526 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
1527 sm = cpi->premature_strata_message;
1528 cpi->premature_strata_message = NULL;
1529
1530 cpi->replaying_strata_message = GNUNET_YES;
1531 handle_p2p_strata (cpi, sm);
1532 cpi->replaying_strata_message = GNUNET_NO;
1319 1533
1320 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg); 1534 GNUNET_free (sm);
1535 }
1321} 1536}
1322 1537
1323 1538
1324/** 1539/**
1325 * Send an IBF of the order specified in cpi. 1540 * Start the inventory round, contact all peers we are supposed to contact.
1326 * 1541 *
1327 * @param cpi the peer 1542 * @param session the current session
1328 */ 1543 */
1329static void 1544static void
1330send_ibf (struct ConsensusPeerInformation *cpi) 1545start_inventory (struct ConsensusSession *session)
1331{ 1546{
1332 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", 1547 int i;
1333 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 1548 int last;
1334 1549
1335 cpi->ibf_bucket_counter = 0; 1550 for (i = 0; i < session->num_peers; i++)
1336 while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
1337 { 1551 {
1338 int num_buckets; 1552 session->info[i].ibf_bucket_counter = 0;
1339 void *buf; 1553 session->info[i].ibf_state = IBF_STATE_NONE;
1340 struct DifferenceDigest *digest; 1554 session->info[i].is_outgoing = GNUNET_NO;
1341 int msize; 1555 }
1342 1556
1343 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; 1557 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1344 /* limit to maximum */ 1558 i = (session->local_peer_idx + 1) % session->num_peers;
1345 if (num_buckets > BUCKETS_PER_MESSAGE) 1559 while (i != last)
1346 num_buckets = BUCKETS_PER_MESSAGE; 1560 {
1561 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
1562 session->info[i].is_outgoing = GNUNET_YES;
1563 embrace_peer (&session->info[i]);
1564 i = (i + 1) % session->num_peers;
1565 }
1566 // tie-breaker for even number of peers
1567 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
1568 {
1569 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
1570 session->info[last].is_outgoing = GNUNET_YES;
1571 embrace_peer (&session->info[last]);
1572 }
1347 1573
1348 msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); 1574 for (i = 0; i < session->num_peers; i++)
1575 {
1576 if (GNUNET_NO == session->info[i].is_outgoing)
1577 replay_premature_message (&session->info[i]);
1578 }
1579}
1349 1580
1350 digest = GNUNET_malloc (msize);
1351 digest->header.size = htons (msize);
1352 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1353 digest->order = cpi->ibf_order;
1354 digest->round = cpi->apparent_round;
1355 1581
1356 buf = &digest[1]; 1582/**
1357 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL); 1583 * Iterator over hash map entries.
1584 *
1585 * @param cls closure
1586 * @param key current key code
1587 * @param value value in the hash map
1588 * @return GNUNET_YES if we should continue to
1589 * iterate,
1590 * GNUNET_NO if not.
1591 */
1592static int
1593send_client_elements_iter (void *cls,
1594 const struct GNUNET_HashCode * key,
1595 void *value)
1596{
1597 struct ConsensusSession *session = cls;
1598 struct ElementInfo *ei = value;
1599 struct PendingMessage *pm;
1358 1600
1359 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest); 1601 /* is the client still there? */
1602 if (NULL == session->scss.client)
1603 return GNUNET_NO;
1360 1604
1361 cpi->ibf_bucket_counter += num_buckets; 1605 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size,
1362 } 1606 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
1363 cpi->ibf_bucket_counter = 0; 1607 message_queue_add (session->client_mq, pm);
1364 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; 1608 return GNUNET_YES;
1365} 1609}
1366 1610
1367 1611
1612
1368/** 1613/**
1369 * Decode the current diff ibf, and send elements/requests/reports/ 1614 * Start the next round.
1615 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1370 * 1616 *
1371 * @param cpi partner peer 1617 * @param cls the session
1618 * @param tc task context, for when this task is invoked by the scheduler,
1619 * NULL if invoked for another reason
1372 */ 1620 */
1373static void 1621static void
1374decode (struct ConsensusPeerInformation *cpi) 1622round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1375{ 1623{
1376 struct IBF_Key key; 1624 struct ConsensusSession *session;
1377 struct GNUNET_HashCode hashcode;
1378 int side;
1379 1625
1380 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 1626 /* don't kick off next round if we're shutting down */
1627 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1628 return;
1381 1629
1382 while (1) 1630 session = cls;
1631 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
1632
1633 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
1383 { 1634 {
1384 int res; 1635 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1636 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
1637 }
1385 1638
1386 res = ibf_decode (cpi->ibf, &side, &key); 1639 switch (session->current_round)
1387 if (GNUNET_SYSERR == res) 1640 {
1388 { 1641 case CONSENSUS_ROUND_BEGIN:
1389 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); 1642 session->current_round = CONSENSUS_ROUND_EXCHANGE;
1390 /* decoding failed, we tell the other peer by sending our ibf with a larger order */ 1643 session->exp_round = 0;
1391 cpi->ibf_order++; 1644 subround_over (session, NULL);
1392 prepare_ibf (cpi); 1645 break;
1393 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); 1646 case CONSENSUS_ROUND_EXCHANGE:
1394 cpi->ibf_state = IBF_STATE_TRANSMITTING; 1647 /* handle two peers specially */
1395 cpi->ibf_bucket_counter = 0; 1648 if (session->num_peers <= 2)
1396 send_ibf (cpi); 1649 {
1397 return; 1650 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
1398 } 1651 GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
1399 if (GNUNET_NO == res) 1652 send_client_conclude_done (session);
1400 { 1653 session->current_round = CONSENSUS_ROUND_FINISH;
1401 struct ConsensusRoundMessage *msg; 1654 return;
1402 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); 1655 }
1403 msg = GNUNET_malloc (sizeof *msg); 1656 session->current_round = CONSENSUS_ROUND_INVENTORY;
1404 msg->header.size = htons (sizeof *msg); 1657 start_inventory (session);
1405 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); 1658 break;
1406 msg->round = cpi->apparent_round; 1659 case CONSENSUS_ROUND_INVENTORY:
1407 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); 1660 session->current_round = CONSENSUS_ROUND_COMPLETION;
1408 return; 1661 session->exp_round = 0;
1409 } 1662 subround_over (session, NULL);
1410 if (-1 == side) 1663 break;
1411 { 1664 case CONSENSUS_ROUND_COMPLETION:
1412 struct ElementList *head; 1665 session->current_round = CONSENSUS_ROUND_FINISH;
1413 /* we have the element(s), send it to the other peer */ 1666 send_client_conclude_done (session);
1414 ibf_hashcode_from_key (key, &hashcode); 1667 break;
1415 head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); 1668 default:
1416 send_elements (cpi, head); 1669 GNUNET_assert (0);
1417 } 1670 }
1418 else 1671}
1419 {
1420 struct ElementRequest *msg;
1421 size_t msize;
1422 struct IBF_Key *p;
1423 1672
1424 msize = (sizeof *msg) + sizeof (struct IBF_Key); 1673
1425 msg = GNUNET_malloc (msize); 1674static void
1426 switch (cpi->apparent_round) 1675fin_sent_cb (void *cls)
1676{
1677 struct ConsensusPeerInformation *cpi;
1678 cpi = cls;
1679 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
1680 switch (cpi->session->current_round)
1681 {
1682 case CONSENSUS_ROUND_EXCHANGE:
1683 case CONSENSUS_ROUND_COMPLETION:
1684 if (cpi->session->current_round != cpi->apparent_round)
1427 { 1685 {
1428 case CONSENSUS_ROUND_STOCK: 1686 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
1429 /* FIXME: check if we really want to request the element */ 1687 break;
1430 case CONSENSUS_ROUND_EXCHANGE:
1431 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1432 break;
1433 case CONSENSUS_ROUND_INVENTORY:
1434 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
1435 break;
1436 default:
1437 GNUNET_assert (0);
1438 } 1688 }
1439 msg->header.size = htons (msize); 1689 cpi->exp_subround_finished = GNUNET_YES;
1440 p = (struct IBF_Key *) &msg[1]; 1690 /* the subround is only really over if *both* partners are done */
1441 *p = key; 1691 if (GNUNET_YES == exp_subround_finished (cpi->session))
1442 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); 1692 subround_over (cpi->session, NULL);
1443 } 1693 else
1694 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
1695 break;
1696 case CONSENSUS_ROUND_INVENTORY:
1697 cpi->inventory_synced = GNUNET_YES;
1698 if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
1699 round_over (cpi->session, NULL);
1700 /* FIXME: maybe go to next round */
1701 break;
1702 default:
1703 GNUNET_break (0);
1704 }
1705}
1706
1707
1708/**
1709 * The other peer wants us to inform that he sent us all the elements we requested.
1710 */
1711static int
1712handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
1713{
1714 struct ConsensusRoundMessage *round_msg;
1715 round_msg = (struct ConsensusRoundMessage *) msg;
1716 /* FIXME: only call subround_over if round is the current one! */
1717 switch (cpi->session->current_round)
1718 {
1719 case CONSENSUS_ROUND_EXCHANGE:
1720 case CONSENSUS_ROUND_COMPLETION:
1721 if (cpi->session->current_round != round_msg->round)
1722 {
1723 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1724 cpi->ibf_state = IBF_STATE_NONE;
1725 cpi->ibf_bucket_counter = 0;
1726 break;
1727 }
1728 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1729 cpi->exp_subround_finished = GNUNET_YES;
1730 if (GNUNET_YES == exp_subround_finished (cpi->session))
1731 subround_over (cpi->session, NULL);
1732 else
1733 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
1734 break;
1735 case CONSENSUS_ROUND_INVENTORY:
1736 cpi->inventory_synced = GNUNET_YES;
1737 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1738 if (inventory_round_finished (cpi->session))
1739 round_over (cpi->session, NULL);
1740 break;
1741 default:
1742 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
1743 break;
1444 } 1744 }
1745 return GNUNET_YES;
1746}
1747
1748
1749/**
1750 * Gets called when the other peer wants us to inform that
1751 * it has decoded our ibf and sent us all elements / requests
1752 */
1753static int
1754handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
1755{
1756 struct PendingMessage *pm;
1757 struct ConsensusRoundMessage *fin_msg;
1758
1759 /* FIXME: why handle current round?? */
1760 switch (cpi->session->current_round)
1761 {
1762 case CONSENSUS_ROUND_INVENTORY:
1763 cpi->inventory_synced = GNUNET_YES;
1764 case CONSENSUS_ROUND_COMPLETION:
1765 case CONSENSUS_ROUND_EXCHANGE:
1766 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n");
1767 pm = new_pending_message (sizeof *fin_msg,
1768 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
1769 fin_msg = (struct ConsensusRoundMessage *) pm->msg;
1770 fin_msg->round = cpi->apparent_round;
1771 /* the subround is over once we kicked off sending the fin msg */
1772 /* FIXME: assert we are talking to the right peer! */
1773 /* FIXME: mark peer as synced */
1774 pm->sent_cb = fin_sent_cb;
1775 pm->sent_cb_cls = cpi;
1776 message_queue_add (cpi->mss.mq, pm);
1777 break;
1778 default:
1779 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
1780 break;
1781 }
1782 return GNUNET_YES;
1445} 1783}
1446 1784
1447 1785
@@ -1459,8 +1797,9 @@ decode (struct ConsensusPeerInformation *cpi)
1459static int 1797static int
1460mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) 1798mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1461{ 1799{
1462 struct ConsensusPeerInformation *cpi; 1800 struct ConsensusPeerInformation *cpi = cls;
1463 cpi = cls; 1801 GNUNET_assert (NULL == client);
1802 GNUNET_assert (NULL != cls);
1464 switch (ntohs (message->type)) 1803 switch (ntohs (message->type))
1465 { 1804 {
1466 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: 1805 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
@@ -1485,6 +1824,228 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader
1485} 1824}
1486 1825
1487 1826
1827static void
1828shuffle (struct ConsensusSession *session)
1829{
1830 /* adapted from random_permute in util/crypto_random.c */
1831 /* FIXME
1832 unsigned int *ret;
1833 unsigned int i;
1834 unsigned int tmp;
1835 uint32_t x;
1836
1837 GNUNET_assert (n > 0);
1838 ret = GNUNET_malloc (n * sizeof (unsigned int));
1839 for (i = 0; i < n; i++)
1840 ret[i] = i;
1841 for (i = n - 1; i > 0; i--)
1842 {
1843 x = GNUNET_CRYPTO_random_u32 (mode, i + 1);
1844 tmp = ret[x];
1845 ret[x] = ret[i];
1846 ret[i] = tmp;
1847 }
1848 */
1849}
1850
1851
1852/**
1853 * Find and set the partner_incoming and partner_outgoing of our peer,
1854 * one of them may not exist in most cases.
1855 *
1856 * @param session the consensus session
1857 */
1858static void
1859find_partners (struct ConsensusSession *session)
1860{
1861 int mark[session->num_peers];
1862 int i;
1863 memset (mark, 0, session->num_peers * sizeof (int));
1864 session->partner_incoming = session->partner_outgoing = NULL;
1865 for (i = 0; i < session->num_peers; i++)
1866 {
1867 int arc;
1868 if (0 != mark[i])
1869 continue;
1870 arc = (i + (1 << session->exp_subround)) % session->num_peers;
1871 mark[i] = mark[arc] = 1;
1872 GNUNET_assert (i != arc);
1873 if (i == session->local_peer_idx)
1874 {
1875 GNUNET_assert (NULL == session->partner_outgoing);
1876 session->partner_outgoing = &session->info[session->shuffle[arc]];
1877 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1878 }
1879 if (arc == session->local_peer_idx)
1880 {
1881 GNUNET_assert (NULL == session->partner_incoming);
1882 session->partner_incoming = &session->info[session->shuffle[i]];
1883 session->partner_incoming->exp_subround_finished = GNUNET_NO;
1884 }
1885 }
1886}
1887
1888
1889/**
1890 * Do the next subround in the exp-scheme.
1891 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1892 *
1893 * @param cls the session
1894 * @param tc task context, for when this task is invoked by the scheduler,
1895 * NULL if invoked for another reason
1896 */
1897static void
1898subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1899{
1900 struct ConsensusSession *session;
1901 int i;
1902
1903 /* don't kick off next subround if we're shutting down */
1904 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1905 return;
1906 session = cls;
1907 /* cancel timeout */
1908 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
1909 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1910 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
1911 /* check if we are done with the log phase, 2-peer consensus only does one log round */
1912 if ( (session->exp_round == NUM_EXP_ROUNDS) ||
1913 ((session->num_peers == 2) && (session->exp_round == 1)))
1914 {
1915 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
1916 round_over (session, NULL);
1917 return;
1918 }
1919 if (session->exp_round == 0)
1920 {
1921 /* initialize everything for the log-rounds */
1922 session->exp_round = 1;
1923 session->exp_subround = 0;
1924 if (NULL == session->shuffle)
1925 session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
1926 for (i = 0; i < session->num_peers; i++)
1927 session->shuffle[i] = i;
1928 }
1929 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
1930 {
1931 /* subrounds done, start new log-round */
1932 session->exp_round++;
1933 session->exp_subround = 0;
1934 shuffle (session);
1935 }
1936 else
1937 {
1938 session->exp_subround++;
1939 }
1940
1941 find_partners (session);
1942
1943#ifdef GNUNET_EXTRA_LOGGING
1944 {
1945 int in;
1946 int out;
1947 if (session->partner_outgoing == NULL)
1948 out = -1;
1949 else
1950 out = (int) (session->partner_outgoing - session->info);
1951 if (session->partner_incoming == NULL)
1952 in = -1;
1953 else
1954 in = (int) (session->partner_incoming - session->info);
1955 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
1956 session->exp_round, session->exp_subround, in, out);
1957 }
1958#endif /* GNUNET_EXTRA_LOGGING */
1959
1960 if (NULL != session->partner_incoming)
1961 {
1962 session->partner_incoming->ibf_state = IBF_STATE_NONE;
1963 session->partner_incoming->exp_subround_finished = GNUNET_NO;
1964 session->partner_incoming->ibf_bucket_counter = 0;
1965
1966 /* maybe there's an early strata estimator? */
1967 replay_premature_message (session->partner_incoming);
1968 }
1969
1970 if (NULL != session->partner_outgoing)
1971 {
1972 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
1973 session->partner_outgoing->ibf_bucket_counter = 0;
1974 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1975 /* make sure peer is connected and send the SE */
1976 embrace_peer (session->partner_outgoing);
1977 }
1978
1979 /*
1980 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
1981 subround_over, session);
1982 */
1983}
1984
1985
1986/**
1987 * Search peer in the list of peers in session.
1988 *
1989 * @param peer peer to find
1990 * @param session session with peer
1991 * @return index of peer, -1 if peer is not in session
1992 */
1993static int
1994get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1995{
1996 int i;
1997 for (i = 0; i < session->num_peers; i++)
1998 if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
1999 return i;
2000 return -1;
2001}
2002
2003
2004/**
2005 * Handle a HELLO-message, send when another peer wants to join a session where
2006 * our peer is a member. The session may or may not be inhabited yet.
2007 */
2008static int
2009handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
2010{
2011 struct ConsensusSession *session;
2012
2013 if (NULL != inc->requested_gid)
2014 {
2015 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n");
2016 return GNUNET_YES;
2017 }
2018 if (NULL != inc->cpi)
2019 {
2020 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n");
2021 return GNUNET_YES;
2022 }
2023
2024 for (session = sessions_head; NULL != session; session = session->next)
2025 {
2026 int idx;
2027 struct ConsensusPeerInformation *cpi;
2028 if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
2029 continue;
2030 idx = get_peer_idx (&inc->peer_id, session);
2031 GNUNET_assert (-1 != idx);
2032 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
2033 cpi = &session->info[idx];
2034 inc->cpi = cpi;
2035 cpi->mss = inc->mss;
2036 cpi = &session->info[idx];
2037 cpi->hello = GNUNET_YES;
2038 cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
2039 embrace_peer (cpi);
2040 return GNUNET_YES;
2041 }
2042 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
2043 inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode));
2044 return GNUNET_YES;
2045}
2046
2047
2048
1488/** 2049/**
1489 * Handle tokenized messages from stream sockets. 2050 * Handle tokenized messages from stream sockets.
1490 * Delegate them if the socket belongs to a session, 2051 * Delegate them if the socket belongs to a session,
@@ -1502,7 +2063,9 @@ static int
1502mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) 2063mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1503{ 2064{
1504 struct IncomingSocket *inc; 2065 struct IncomingSocket *inc;
1505 inc = (struct IncomingSocket *) client; 2066 GNUNET_assert (NULL == client);
2067 GNUNET_assert (NULL != cls);
2068 inc = (struct IncomingSocket *) cls;
1506 switch (ntohs( message->type)) 2069 switch (ntohs( message->type))
1507 { 2070 {
1508 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: 2071 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
@@ -1542,133 +2105,18 @@ listen_cb (void *cls,
1542 return GNUNET_SYSERR; 2105 return GNUNET_SYSERR;
1543 } 2106 }
1544 incoming = GNUNET_malloc (sizeof *incoming); 2107 incoming = GNUNET_malloc (sizeof *incoming);
1545 incoming->socket = socket;
1546 incoming->peer_id = *initiator; 2108 incoming->peer_id = *initiator;
1547 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 2109 incoming->mss.socket = socket;
1548 &incoming_stream_data_processor, incoming); 2110 incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1549 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); 2111 &stream_data_processor, &incoming->mss);
2112 incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
2113 incoming->mss.mst_cls = incoming;
1550 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); 2114 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
1551 return GNUNET_OK; 2115 return GNUNET_OK;
1552} 2116}
1553 2117
1554 2118
1555/** 2119/**
1556 * Iterator over hash map entries.
1557 *
1558 * @param cls closure
1559 * @param key current key code
1560 * @param value value in the hash map
1561 * @return GNUNET_YES if we should continue to
1562 * iterate,
1563 * GNUNET_NO if not.
1564 */
1565static int
1566destroy_element_list_iter (void *cls,
1567 const struct GNUNET_HashCode * key,
1568 void *value)
1569{
1570 struct ElementList *el;
1571 el = value;
1572 while (NULL != el)
1573 {
1574 struct ElementList *el_old;
1575 el_old = el;
1576 el = el->next;
1577 GNUNET_free (el_old->element_hash);
1578 GNUNET_free (el_old->element);
1579 GNUNET_free (el_old);
1580 }
1581 return GNUNET_YES;
1582}
1583
1584
1585/**
1586 * Destroy a session, free all resources associated with it.
1587 *
1588 * @param session the session to destroy
1589 */
1590static void
1591destroy_session (struct ConsensusSession *session)
1592{
1593 int i;
1594
1595 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
1596 GNUNET_SERVER_client_drop (session->client);
1597 session->client = NULL;
1598 if (NULL != session->shuffle)
1599 {
1600 GNUNET_free (session->shuffle);
1601 session->shuffle = NULL;
1602 }
1603 if (NULL != session->se)
1604 {
1605 strata_estimator_destroy (session->se);
1606 session->se = NULL;
1607 }
1608 if (NULL != session->info)
1609 {
1610 for (i = 0; i < session->num_peers; i++)
1611 {
1612 struct ConsensusPeerInformation *cpi;
1613 cpi = &session->info[i];
1614 if ((NULL != cpi) && (NULL != cpi->socket))
1615 {
1616 if (NULL != cpi->rh)
1617 {
1618 GNUNET_STREAM_read_cancel (cpi->rh);
1619 cpi->rh = NULL;
1620 }
1621 if (NULL != cpi->wh)
1622 {
1623 GNUNET_STREAM_write_cancel (cpi->wh);
1624 cpi->wh = NULL;
1625 }
1626 GNUNET_STREAM_close (cpi->socket);
1627 cpi->socket = NULL;
1628 }
1629 if (NULL != cpi->se)
1630 {
1631 strata_estimator_destroy (cpi->se);
1632 cpi->se = NULL;
1633 }
1634 if (NULL != cpi->ibf)
1635 {
1636 ibf_destroy (cpi->ibf);
1637 cpi->ibf = NULL;
1638 }
1639 if (NULL != cpi->mst)
1640 {
1641 GNUNET_SERVER_mst_destroy (cpi->mst);
1642 cpi->mst = NULL;
1643 }
1644 }
1645 GNUNET_free (session->info);
1646 session->info = NULL;
1647 }
1648 if (NULL != session->ibfs)
1649 {
1650 for (i = 0; i <= MAX_IBF_ORDER; i++)
1651 {
1652 if (NULL != session->ibfs[i])
1653 {
1654 ibf_destroy (session->ibfs[i]);
1655 session->ibfs[i] = NULL;
1656 }
1657 }
1658 GNUNET_free (session->ibfs);
1659 session->ibfs = NULL;
1660 }
1661 if (NULL != session->values)
1662 {
1663 GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL);
1664 GNUNET_CONTAINER_multihashmap_destroy (session->values);
1665 session->values = NULL;
1666 }
1667 GNUNET_free (session);
1668}
1669
1670
1671/**
1672 * Disconnect a client, and destroy all sessions associated with it. 2120 * Disconnect a client, and destroy all sessions associated with it.
1673 * 2121 *
1674 * @param client the client to disconnect 2122 * @param client the client to disconnect
@@ -1683,7 +2131,7 @@ disconnect_client (struct GNUNET_SERVER_Client *client)
1683 session = sessions_head; 2131 session = sessions_head;
1684 while (NULL != session) 2132 while (NULL != session)
1685 { 2133 {
1686 if (client == session->client) 2134 if (client == session->scss.client)
1687 { 2135 {
1688 destroy_session (session); 2136 destroy_session (session);
1689 break; 2137 break;
@@ -1720,74 +2168,6 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod
1720 2168
1721 2169
1722/** 2170/**
1723 * Transmit a queued message to the session's client.
1724 *
1725 * @param cls consensus session
1726 * @param size number of bytes available in buf
1727 * @param buf where the callee should write the message
1728 * @return number of bytes written to buf
1729 */
1730static size_t
1731transmit_queued (void *cls, size_t size,
1732 void *buf)
1733{
1734 struct ConsensusSession *session;
1735 struct QueuedMessage *qmsg;
1736 size_t msg_size;
1737
1738 session = cls;
1739 session->client_th = NULL;
1740
1741 qmsg = session->client_messages_head;
1742 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
1743 GNUNET_assert (qmsg);
1744
1745 if (NULL == buf)
1746 {
1747 destroy_session (session);
1748 return 0;
1749 }
1750
1751 msg_size = ntohs (qmsg->msg->size);
1752
1753 GNUNET_assert (size >= msg_size);
1754
1755 memcpy (buf, qmsg->msg, msg_size);
1756 GNUNET_free (qmsg->msg);
1757 GNUNET_free (qmsg);
1758
1759 client_send_next (session);
1760
1761 return msg_size;
1762}
1763
1764
1765/**
1766 * Schedule transmitting the next queued message (if any) to the inhabiting client of a session.
1767 *
1768 * @param session the consensus session
1769 */
1770static void
1771client_send_next (struct ConsensusSession *session)
1772{
1773
1774 GNUNET_assert (NULL != session);
1775
1776 if (NULL != session->client_th)
1777 return;
1778
1779 if (NULL != session->client_messages_head)
1780 {
1781 int msize;
1782 msize = ntohs (session->client_messages_head->msg->size);
1783 session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize,
1784 GNUNET_TIME_UNIT_FOREVER_REL,
1785 &transmit_queued, session);
1786 }
1787}
1788
1789
1790/**
1791 * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have 2171 * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
1792 * the correct signature to be used with e.g. qsort. 2172 * the correct signature to be used with e.g. qsort.
1793 * We use this function instead. 2173 * We use this function instead.
@@ -1804,67 +2184,6 @@ hash_cmp (const void *h1, const void *h2)
1804 2184
1805 2185
1806/** 2186/**
1807 * Search peer in the list of peers in session.
1808 *
1809 * @param peer peer to find
1810 * @param session session with peer
1811 * @return index of peer, -1 if peer is not in session
1812 */
1813static int
1814get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1815{
1816 int i;
1817 for (i = 0; i < session->num_peers; i++)
1818 if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
1819 return i;
1820 return -1;
1821}
1822
1823
1824/**
1825 * Called when stream has finishes writing the hello message
1826 */
1827static void
1828hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1829{
1830 struct ConsensusPeerInformation *cpi;
1831
1832 cpi = cls;
1833 cpi->wh = NULL;
1834 cpi->hello = GNUNET_YES;
1835 GNUNET_assert (GNUNET_STREAM_OK == status);
1836 embrace_peer (cpi);
1837}
1838
1839
1840/**
1841 * Called when we established a stream connection to another peer
1842 *
1843 * @param cls cpi of the peer we just connected to
1844 * @param socket socket to use to communicate with the other side (read/write)
1845 */
1846static void
1847open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1848{
1849 struct ConsensusPeerInformation *cpi;
1850 struct ConsensusHello *hello;
1851
1852 cpi = cls;
1853 hello = GNUNET_malloc (sizeof *hello);
1854 hello->header.size = htons (sizeof *hello);
1855 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1856 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1857 GNUNET_assert (NULL == cpi->mst);
1858 cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1859 cpi->wh =
1860 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
1861 GNUNET_free (hello);
1862 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1863 &session_stream_data_processor, cpi);
1864}
1865
1866
1867/**
1868 * Create the sorted list of peers for the session, 2187 * Create the sorted list of peers for the session,
1869 * add the local peer if not in the join message. 2188 * add the local peer if not in the join message.
1870 */ 2189 */
@@ -1921,19 +2240,6 @@ initialize_session_peer_list (struct ConsensusSession *session)
1921} 2240}
1922 2241
1923 2242
1924static void
1925strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key)
1926{
1927 uint32_t v;
1928 int i;
1929 v = key->bits[0];
1930 /* count trailing '1'-bits of v */
1931 for (i = 0; v & 1; v>>=1, i++)
1932 /* empty */;
1933 ibf_insert (se->strata[i], ibf_key_from_hashcode (key));
1934}
1935
1936
1937/** 2243/**
1938 * Add incoming peer connections to the session, 2244 * Add incoming peer connections to the session,
1939 * for peers who have connected to us before the local session has been established 2245 * for peers who have connected to us before the local session has been established
@@ -1944,28 +2250,23 @@ static void
1944add_incoming_peers (struct ConsensusSession *session) 2250add_incoming_peers (struct ConsensusSession *session)
1945{ 2251{
1946 struct IncomingSocket *inc; 2252 struct IncomingSocket *inc;
1947 inc = incoming_sockets_head; 2253 int i;
2254 struct ConsensusPeerInformation *cpi;
1948 2255
1949 while (NULL != inc) 2256 for (inc = incoming_sockets_head; NULL != inc; inc = inc->next)
1950 { 2257 {
1951 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) 2258 if ( (NULL == inc->requested_gid) ||
2259 (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) )
2260 continue;
2261 for (i = 0; i < session->num_peers; i++)
1952 { 2262 {
1953 int i; 2263 cpi = &session->info[i];
1954 for (i = 0; i < session->num_peers; i++) 2264 cpi->peer_id = inc->peer_id;
1955 { 2265 cpi->mss = inc->mss;
1956 struct ConsensusPeerInformation *cpi; 2266 cpi->hello = GNUNET_YES;
1957 cpi = &session->info[i]; 2267 inc->cpi = cpi;
1958 if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity))) 2268 break;
1959 {
1960 cpi->socket = inc->socket;
1961 inc->cpi = cpi;
1962 inc->cpi->mst = inc->mst;
1963 inc->cpi->hello = GNUNET_YES;
1964 break;
1965 }
1966 }
1967 } 2269 }
1968 inc = inc->next;
1969 } 2270 }
1970} 2271}
1971 2272
@@ -1982,7 +2283,6 @@ initialize_session (struct ConsensusSession *session)
1982 2283
1983 GNUNET_assert (NULL != session->join_msg); 2284 GNUNET_assert (NULL != session->join_msg);
1984 initialize_session_peer_list (session); 2285 initialize_session_peer_list (session);
1985 session->current_round = CONSENSUS_ROUND_BEGIN;
1986 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); 2286 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
1987 compute_global_id (session, &session->join_msg->session_id); 2287 compute_global_id (session, &session->join_msg->session_id);
1988 2288
@@ -1993,24 +2293,30 @@ initialize_session (struct ConsensusSession *session)
1993 if ((other_session != session) && 2293 if ((other_session != session) &&
1994 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) 2294 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1995 { 2295 {
1996 /* session already owned by another client */ 2296 if (GNUNET_NO == other_session->conclude)
1997 GNUNET_break (0); 2297 {
1998 disconnect_client (session->client); 2298 /* session already owned by another client */
1999 return; 2299 GNUNET_break (0);
2300 disconnect_client (session->scss.client);
2301 return;
2302 }
2303 else
2304 {
2305 GNUNET_SERVER_client_drop (session->scss.client);
2306 session->scss.client = NULL;
2307 break;
2308 }
2000 } 2309 }
2001 other_session = other_session->next; 2310 other_session = other_session->next;
2002 } 2311 }
2003 2312
2004 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2005 session->local_peer_idx = get_peer_idx (my_peer, session); 2313 session->local_peer_idx = get_peer_idx (my_peer, session);
2006 GNUNET_assert (-1 != session->local_peer_idx); 2314 GNUNET_assert (-1 != session->local_peer_idx);
2007 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); 2315 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
2008 session->se = strata_estimator_create ();
2009 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
2010 GNUNET_free (session->join_msg); 2316 GNUNET_free (session->join_msg);
2011 session->join_msg = NULL; 2317 session->join_msg = NULL;
2012 add_incoming_peers (session); 2318 add_incoming_peers (session);
2013 GNUNET_SERVER_receive_done (session->client, GNUNET_OK); 2319 GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK);
2014 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); 2320 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2015} 2321}
2016 2322
@@ -2033,7 +2339,7 @@ client_join (void *cls,
2033 session = sessions_head; 2339 session = sessions_head;
2034 while (NULL != session) 2340 while (NULL != session)
2035 { 2341 {
2036 if (session->client == client) 2342 if (session->scss.client == client)
2037 { 2343 {
2038 GNUNET_break (0); 2344 GNUNET_break (0);
2039 disconnect_client (client); 2345 disconnect_client (client);
@@ -2042,9 +2348,15 @@ client_join (void *cls,
2042 session = session->next; 2348 session = session->next;
2043 } 2349 }
2044 2350
2045 session = GNUNET_malloc (sizeof (struct ConsensusSession)); 2351 session = GNUNET_new (struct ConsensusSession);
2046 session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); 2352 session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
2047 session->client = client; 2353 /* these have to be initialized here, as the client can already start to give us values */
2354 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
2355 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2356 session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2357 session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
2358 session->scss.client = client;
2359 session->client_mq = create_message_queue_for_server_client (&session->scss);
2048 GNUNET_SERVER_client_keep (client); 2360 GNUNET_SERVER_client_keep (client);
2049 2361
2050 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); 2362 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
@@ -2060,57 +2372,6 @@ client_join (void *cls,
2060} 2372}
2061 2373
2062 2374
2063/**
2064 * Hash a block of data, producing a replicated ibf hash.
2065 */
2066static void
2067hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret)
2068{
2069 struct IBF_Key ibf_key;
2070 GNUNET_CRYPTO_hash (block, size, ret);
2071 ibf_key = ibf_key_from_hashcode (ret);
2072 ibf_hashcode_from_key (ibf_key, ret);
2073}
2074
2075
2076static void
2077insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
2078{
2079 struct GNUNET_HashCode hash;
2080 struct ElementList *head;
2081
2082 hash_for_ibf (element->data, element->size, &hash);
2083
2084 head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash);
2085
2086 if (NULL == head)
2087 {
2088 int i;
2089
2090 head = GNUNET_malloc (sizeof *head);
2091 head->element = element;
2092 head->next = NULL;
2093 head->element_hash = GNUNET_memdup (&hash, sizeof hash);
2094 GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head,
2095 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2096 strata_estimator_insert (session->se, &hash);
2097
2098 for (i = 0; i <= MAX_IBF_ORDER; i++)
2099 if (NULL != session->ibfs[i])
2100 ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash));
2101 }
2102 else
2103 {
2104 struct ElementList *el;
2105 el = GNUNET_malloc (sizeof *el);
2106 head->element = element;
2107 head->next = NULL;
2108 head->element_hash = GNUNET_memdup (&hash, sizeof hash);
2109 while (NULL != head->next)
2110 head = head->next;
2111 head->next = el;
2112 }
2113}
2114 2375
2115 2376
2116/** 2377/**
@@ -2133,7 +2394,7 @@ client_insert (void *cls,
2133 session = sessions_head; 2394 session = sessions_head;
2134 while (NULL != session) 2395 while (NULL != session)
2135 { 2396 {
2136 if (session->client == client) 2397 if (session->scss.client == client)
2137 break; 2398 break;
2138 } 2399 }
2139 2400
@@ -2146,373 +2407,15 @@ client_insert (void *cls,
2146 2407
2147 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; 2408 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2148 element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); 2409 element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2149
2150 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); 2410 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
2151
2152 element->type = msg->element_type; 2411 element->type = msg->element_type;
2153 element->size = element_size; 2412 element->size = element_size;
2154 memcpy (&element[1], &msg[1], element_size); 2413 memcpy (&element[1], &msg[1], element_size);
2155 element->data = &element[1]; 2414 element->data = &element[1];
2156
2157 GNUNET_assert (NULL != element->data); 2415 GNUNET_assert (NULL != element->data);
2158
2159 insert_element (session, element); 2416 insert_element (session, element);
2160 2417
2161 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2418 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2162
2163 client_send_next (session);
2164}
2165
2166
2167
2168/**
2169 * Functions of this signature are called whenever writing operations
2170 * on a stream are executed
2171 *
2172 * @param cls the closure from GNUNET_STREAM_write
2173 * @param status the status of the stream at the time this function is called;
2174 * GNUNET_STREAM_OK if writing to stream was completed successfully;
2175 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
2176 * (this doesn't mean that the data is never sent, the receiver may
2177 * have read the data but its ACKs may have been lost);
2178 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
2179 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
2180 * be processed.
2181 * @param size the number of bytes written
2182 */
2183static void
2184write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
2185{
2186 struct ConsensusPeerInformation *cpi;
2187
2188 GNUNET_assert (GNUNET_STREAM_OK == status);
2189 cpi = cls;
2190 cpi->wh = NULL;
2191 if (NULL != cpi->messages_head)
2192 {
2193 struct QueuedMessage *qm;
2194 qm = cpi->messages_head;
2195 GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm);
2196 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
2197 GNUNET_TIME_UNIT_FOREVER_REL,
2198 write_queued, cpi);
2199 if (NULL != qm->cb)
2200 qm->cb (qm->cls);
2201 GNUNET_free (qm->msg);
2202 GNUNET_free (qm);
2203 GNUNET_assert (NULL != cpi->wh);
2204 }
2205}
2206
2207
2208static void
2209shuffle (struct ConsensusSession *session)
2210{
2211 /* FIXME: implement */
2212}
2213
2214
2215/**
2216 * Find and set the partner_incoming and partner_outgoing of our peer,
2217 * one of them may not exist in most cases.
2218 *
2219 * @param session the consensus session
2220 */
2221static void
2222find_partners (struct ConsensusSession *session)
2223{
2224 int mark[session->num_peers];
2225 int i;
2226 memset (mark, 0, session->num_peers * sizeof (int));
2227 session->partner_incoming = session->partner_outgoing = NULL;
2228 for (i = 0; i < session->num_peers; i++)
2229 {
2230 int arc;
2231 if (0 != mark[i])
2232 continue;
2233 arc = (i + (1 << session->exp_subround)) % session->num_peers;
2234 mark[i] = mark[arc] = 1;
2235 GNUNET_assert (i != arc);
2236 if (i == session->local_peer_idx)
2237 {
2238 GNUNET_assert (NULL == session->partner_outgoing);
2239 session->partner_outgoing = &session->info[session->shuffle[arc]];
2240 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
2241 }
2242 if (arc == session->local_peer_idx)
2243 {
2244 GNUNET_assert (NULL == session->partner_incoming);
2245 session->partner_incoming = &session->info[session->shuffle[i]];
2246 session->partner_incoming->exp_subround_finished = GNUNET_NO;
2247 }
2248 }
2249}
2250
2251
2252static void
2253replay_premature_message (struct ConsensusPeerInformation *cpi)
2254{
2255 if (NULL != cpi->premature_strata_message)
2256 {
2257 struct StrataMessage *sm;
2258
2259 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
2260 sm = cpi->premature_strata_message;
2261 cpi->premature_strata_message = NULL;
2262
2263 cpi->replaying_strata_message = GNUNET_YES;
2264 handle_p2p_strata (cpi, sm);
2265 cpi->replaying_strata_message = GNUNET_NO;
2266
2267 GNUNET_free (sm);
2268 }
2269}
2270
2271
2272/**
2273 * Do the next subround in the exp-scheme.
2274 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
2275 *
2276 * @param cls the session
2277 * @param tc task context, for when this task is invoked by the scheduler,
2278 * NULL if invoked for another reason
2279 */
2280static void
2281subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2282{
2283 struct ConsensusSession *session;
2284 int i;
2285
2286 /* don't kick off next subround if we're shutting down */
2287 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2288 return;
2289 session = cls;
2290 /* don't send any messages from the last round */
2291 /*
2292 clear_peer_messages (session->partner_outgoing);
2293 clear_peer_messages (session->partner_incoming);
2294 for (i = 0; i < session->num_peers; i++)
2295 clear_peer_messages (&session->info[i]);
2296 */
2297 /* cancel timeout */
2298 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2299 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2300 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2301 /* check if we are done with the log phase, 2-peer consensus only does one log round */
2302 if ( (session->exp_round == NUM_EXP_ROUNDS) ||
2303 ((session->num_peers == 2) && (session->exp_round == 1)))
2304 {
2305 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
2306 round_over (session, NULL);
2307 return;
2308 }
2309 if (session->exp_round == 0)
2310 {
2311 /* initialize everything for the log-rounds */
2312 session->exp_round = 1;
2313 session->exp_subround = 0;
2314 if (NULL == session->shuffle)
2315 session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
2316 for (i = 0; i < session->num_peers; i++)
2317 session->shuffle[i] = i;
2318 }
2319 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
2320 {
2321 /* subrounds done, start new log-round */
2322 session->exp_round++;
2323 session->exp_subround = 0;
2324 shuffle (session);
2325 }
2326 else
2327 {
2328 session->exp_subround++;
2329 }
2330
2331 find_partners (session);
2332
2333#ifdef GNUNET_EXTRA_LOGGING
2334 {
2335 int in;
2336 int out;
2337 if (session->partner_outgoing == NULL)
2338 out = -1;
2339 else
2340 out = (int) (session->partner_outgoing - session->info);
2341 if (session->partner_incoming == NULL)
2342 in = -1;
2343 else
2344 in = (int) (session->partner_incoming - session->info);
2345 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
2346 session->exp_round, session->exp_subround, in, out);
2347 }
2348#endif /* GNUNET_EXTRA_LOGGING */
2349
2350 if (NULL != session->partner_incoming)
2351 {
2352 session->partner_incoming->ibf_state = IBF_STATE_NONE;
2353 session->partner_incoming->exp_subround_finished = GNUNET_NO;
2354 session->partner_incoming->ibf_bucket_counter = 0;
2355
2356 /* maybe there's an early strata estimator? */
2357 replay_premature_message (session->partner_incoming);
2358 }
2359
2360 if (NULL != session->partner_outgoing)
2361 {
2362 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
2363 session->partner_outgoing->ibf_bucket_counter = 0;
2364 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
2365
2366 if (NULL == session->partner_outgoing->socket)
2367 {
2368 session->partner_outgoing->socket =
2369 GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
2370 open_cb, session->partner_outgoing,
2371 GNUNET_STREAM_OPTION_END);
2372 }
2373 else if (GNUNET_YES == session->partner_outgoing->hello)
2374 {
2375 send_strata_estimator (session->partner_outgoing);
2376 }
2377 /* else: do nothing, the send hello cb will handle this */
2378 }
2379
2380 /*
2381 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
2382 subround_over, session);
2383 */
2384}
2385
2386static void
2387contact_peer_a2a (struct ConsensusPeerInformation *cpi)
2388{
2389 cpi->is_outgoing = GNUNET_YES;
2390 if (NULL == cpi->socket)
2391 {
2392 cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
2393 open_cb, cpi, GNUNET_STREAM_OPTION_END);
2394 }
2395 else if (GNUNET_YES == cpi->hello)
2396 {
2397 send_strata_estimator (cpi);
2398 }
2399}
2400
2401/**
2402 * Start the inventory round, contact all peers we are supposed to contact.
2403 *
2404 * @param session the current session
2405 */
2406static void
2407start_inventory (struct ConsensusSession *session)
2408{
2409 int i;
2410 int last;
2411
2412 for (i = 0; i < session->num_peers; i++)
2413 {
2414 session->info[i].ibf_bucket_counter = 0;
2415 session->info[i].ibf_state = IBF_STATE_NONE;
2416 session->info[i].is_outgoing = GNUNET_NO;
2417 }
2418
2419 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
2420 i = (session->local_peer_idx + 1) % session->num_peers;
2421 while (i != last)
2422 {
2423 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
2424 contact_peer_a2a (&session->info[i]);
2425 session->info[i].is_outgoing = GNUNET_YES;
2426 i = (i + 1) % session->num_peers;
2427 }
2428 // tie-breaker for even number of peers
2429 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
2430 {
2431 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
2432 session->info[last].is_outgoing = GNUNET_YES;
2433 contact_peer_a2a (&session->info[last]);
2434 }
2435
2436 for (i = 0; i < session->num_peers; i++)
2437 {
2438 if (GNUNET_NO == session->info[i].is_outgoing)
2439 replay_premature_message (&session->info[i]);
2440 }
2441}
2442
2443static void
2444send_client_conclude_done (struct ConsensusSession *session)
2445{
2446 struct GNUNET_MessageHeader *msg;
2447 session->current_round = CONSENSUS_ROUND_FINISH;
2448 msg = GNUNET_malloc (sizeof *msg);
2449 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
2450 msg->size = htons (sizeof *msg);
2451 queue_client_message (session, msg);
2452 client_send_next (session);
2453}
2454
2455/**
2456 * Start the next round.
2457 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
2458 *
2459 * @param cls the session
2460 * @param tc task context, for when this task is invoked by the scheduler,
2461 * NULL if invoked for another reason
2462 */
2463static void
2464round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2465{
2466 struct ConsensusSession *session;
2467
2468 /* don't kick off next round if we're shutting down */
2469 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2470 return;
2471
2472 session = cls;
2473 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
2474
2475 /*
2476 for (i = 0; i < session->num_peers; i++)
2477 clear_peer_messages (&session->info[i]);
2478 */
2479
2480 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2481 {
2482 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2483 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2484 }
2485
2486 switch (session->current_round)
2487 {
2488 case CONSENSUS_ROUND_BEGIN:
2489 session->current_round = CONSENSUS_ROUND_EXCHANGE;
2490 session->exp_round = 0;
2491 subround_over (session, NULL);
2492 break;
2493 case CONSENSUS_ROUND_EXCHANGE:
2494 /* handle two peers specially */
2495 if (session->num_peers <= 2)
2496 {
2497 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx);
2498 send_client_conclude_done (session);
2499 return;
2500 }
2501 session->current_round = CONSENSUS_ROUND_INVENTORY;
2502 start_inventory (session);
2503 break;
2504 case CONSENSUS_ROUND_INVENTORY:
2505 session->current_round = CONSENSUS_ROUND_STOCK;
2506 session->exp_round = 0;
2507 subround_over (session, NULL);
2508 break;
2509 case CONSENSUS_ROUND_STOCK:
2510 session->current_round = CONSENSUS_ROUND_FINISH;
2511 send_client_conclude_done (session);
2512 break;
2513 default:
2514 GNUNET_assert (0);
2515 }
2516} 2419}
2517 2420
2518 2421
@@ -2534,7 +2437,7 @@ client_conclude (void *cls,
2534 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; 2437 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
2535 2438
2536 session = sessions_head; 2439 session = sessions_head;
2537 while ((session != NULL) && (session->client != client)) 2440 while ((session != NULL) && (session->scss.client != client))
2538 session = session->next; 2441 session = session->next;
2539 if (NULL == session) 2442 if (NULL == session)
2540 { 2443 {
@@ -2553,6 +2456,8 @@ client_conclude (void *cls,
2553 return; 2456 return;
2554 } 2457 }
2555 2458
2459 session->conclude = GNUNET_YES;
2460
2556 if (session->num_peers <= 1) 2461 if (session->num_peers <= 1)
2557 { 2462 {
2558 send_client_conclude_done (session); 2463 send_client_conclude_done (session);
@@ -2565,57 +2470,6 @@ client_conclude (void *cls,
2565 } 2470 }
2566 2471
2567 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2472 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2568 client_send_next (session);
2569}
2570
2571
2572/**
2573 * Called when a client sends an ack
2574 *
2575 * @param cls (unused)
2576 * @param client client handle
2577 * @param message message sent by the client
2578 */
2579static void
2580client_ack (void *cls,
2581 struct GNUNET_SERVER_Client *client,
2582 const struct GNUNET_MessageHeader *message)
2583{
2584 struct ConsensusSession *session;
2585 struct GNUNET_CONSENSUS_AckMessage *msg;
2586 struct PendingElement *pending;
2587 struct GNUNET_CONSENSUS_Element *element;
2588
2589 session = sessions_head;
2590 while (NULL != session)
2591 {
2592 if (session->client == client)
2593 break;
2594 }
2595
2596 if (NULL == session)
2597 {
2598 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
2599 GNUNET_SERVER_client_disconnect (client);
2600 return;
2601 }
2602
2603 pending = session->client_approval_head;
2604
2605 GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending);
2606
2607 msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
2608
2609 if (msg->keep)
2610 {
2611 element = pending->element;
2612 insert_element (session, element);
2613 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n");
2614 }
2615
2616 GNUNET_free (pending);
2617
2618 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2619} 2473}
2620 2474
2621 2475
@@ -2649,14 +2503,10 @@ core_startup (void *cls,
2649 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ 2503 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
2650 GNUNET_SCHEDULER_add_now (&disconnect_core, core); 2504 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
2651 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); 2505 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
2652 2506 /* initialize sessions that are waiting for the local peer identity */
2653 session = sessions_head; 2507 for (session = sessions_head; NULL != session; session = session->next)
2654 while (NULL != session)
2655 {
2656 if (NULL != session->join_msg) 2508 if (NULL != session->join_msg)
2657 initialize_session (session); 2509 initialize_session (session);
2658 session = session->next;
2659 }
2660} 2510}
2661 2511
2662 2512
@@ -2670,27 +2520,12 @@ static void
2670shutdown_task (void *cls, 2520shutdown_task (void *cls,
2671 const struct GNUNET_SCHEDULER_TaskContext *tc) 2521 const struct GNUNET_SCHEDULER_TaskContext *tc)
2672{ 2522{
2673 /* FIXME: complete; write separate destructors for different data types */
2674
2675 while (NULL != incoming_sockets_head) 2523 while (NULL != incoming_sockets_head)
2676 { 2524 {
2677 struct IncomingSocket *socket; 2525 struct IncomingSocket *socket;
2678 socket = incoming_sockets_head; 2526 socket = incoming_sockets_head;
2679 if (NULL != socket->rh)
2680 {
2681 GNUNET_STREAM_read_cancel (socket->rh);
2682 socket->rh = NULL;
2683 }
2684 if (NULL == socket->cpi) 2527 if (NULL == socket->cpi)
2685 { 2528 clear_message_stream_state (&socket->mss);
2686 GNUNET_STREAM_close (socket->socket);
2687 socket->socket = NULL;
2688 if (NULL != socket->mst)
2689 {
2690 GNUNET_SERVER_mst_destroy (socket->mst);
2691 socket->mst = NULL;
2692 }
2693 }
2694 incoming_sockets_head = incoming_sockets_head->next; 2529 incoming_sockets_head = incoming_sockets_head->next;
2695 GNUNET_free (socket); 2530 GNUNET_free (socket);
2696 } 2531 }
@@ -2738,8 +2573,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
2738 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, 2573 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
2739 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, 2574 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
2740 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, 2575 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
2741 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
2742 sizeof (struct GNUNET_CONSENSUS_AckMessage)},
2743 {NULL, NULL, 0, 0} 2576 {NULL, NULL, 0, 0}
2744 }; 2577 };
2745 2578
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c
index 87dbdd696..739b97339 100644
--- a/src/consensus/ibf.c
+++ b/src/consensus/ibf.c
@@ -63,12 +63,10 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst)
63 * 63 *
64 * @param size number of IBF buckets 64 * @param size number of IBF buckets
65 * @param hash_num number of buckets one element is hashed in 65 * @param hash_num number of buckets one element is hashed in
66 * @param salt salt for mingling hashes, different salt may
67 * result in less (or more) collisions
68 * @return the newly created invertible bloom filter 66 * @return the newly created invertible bloom filter
69 */ 67 */
70struct InvertibleBloomFilter * 68struct InvertibleBloomFilter *
71ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt) 69ibf_create (uint32_t size, uint8_t hash_num)
72{ 70{
73 struct InvertibleBloomFilter *ibf; 71 struct InvertibleBloomFilter *ibf;
74 72
@@ -235,32 +233,25 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
235 233
236 234
237/** 235/**
238 * Write an ibf. 236 * Write buckets from an ibf to a buffer.
237 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
239 * 238 *
240 * @param ibf the ibf to write 239 * @param ibf the ibf to write
241 * @param start with which bucket to start 240 * @param start with which bucket to start
242 * @param count how many buckets to write 241 * @param count how many buckets to write
243 * @param buf buffer to write the data to, will be updated to point to the 242 * @param buf buffer to write the data to
244 * first byte after the written data
245 * @param size pointer to the size of the buffer, will be updated, can be NULL
246 */ 243 */
247void 244void
248ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size) 245ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
249{ 246{
250 struct IBF_Key *key_dst; 247 struct IBF_Key *key_dst;
251 struct IBF_KeyHash *key_hash_dst; 248 struct IBF_KeyHash *key_hash_dst;
252 struct IBF_Count *count_dst; 249 struct IBF_Count *count_dst;
253 250
254 /* update size and check for overflow */ 251 GNUNET_assert (start + count <= ibf->size);
255 if (NULL != size) 252
256 {
257 size_t old_size;
258 old_size = *size;
259 *size = *size - count * IBF_BUCKET_SIZE;
260 GNUNET_assert (*size < old_size);
261 }
262 /* copy keys */ 253 /* copy keys */
263 key_dst = (struct IBF_Key *) *buf; 254 key_dst = (struct IBF_Key *) buf;
264 memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst); 255 memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst);
265 key_dst += count; 256 key_dst += count;
266 /* copy key hashes */ 257 /* copy key hashes */
@@ -271,40 +262,28 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32
271 count_dst = (struct IBF_Count *) key_hash_dst; 262 count_dst = (struct IBF_Count *) key_hash_dst;
272 memcpy (count_dst, ibf->count + start, count * sizeof *count_dst); 263 memcpy (count_dst, ibf->count + start, count * sizeof *count_dst);
273 count_dst += count; 264 count_dst += count;
274 /* returned buffer is at the end of written data*/
275 *buf = (void *) count_dst;
276} 265}
277 266
278 267
279/** 268/**
280 * Read an ibf. 269 * Read buckets from a buffer into an ibf.
281 * 270 *
282 * @param buf pointer to the buffer to write to, will point to first 271 * @param buf pointer to the buffer to read from
283 * byte after the written data // FIXME: take 'const void *buf' for input, return number of bytes READ
284 * @param size size of the buffer, will be updated
285 * @param start which bucket to start at 272 * @param start which bucket to start at
286 * @param count how many buckets to read 273 * @param count how many buckets to read
287 * @param ibf the ibf to read from 274 * @param ibf the ibf to read from
288 * @return GNUNET_OK on success // FIXME: return 0 on error (or -1/ssize_t), number of bytes read otherwise
289 */ 275 */
290int 276void
291ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) 277ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
292{ 278{
293 struct IBF_Key *key_src; 279 struct IBF_Key *key_src;
294 struct IBF_KeyHash *key_hash_src; 280 struct IBF_KeyHash *key_hash_src;
295 struct IBF_Count *count_src; 281 struct IBF_Count *count_src;
296 282
297 /* update size and check for overflow */ 283 GNUNET_assert (start + count <= ibf->size);
298 if (NULL != size) 284
299 {
300 size_t old_size;
301 old_size = *size;
302 *size = *size - count * IBF_BUCKET_SIZE;
303 if (*size > old_size)
304 return GNUNET_SYSERR;
305 }
306 /* copy keys */ 285 /* copy keys */
307 key_src = (struct IBF_Key *) *buf; 286 key_src = (struct IBF_Key *) buf;
308 memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src); 287 memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src);
309 key_src += count; 288 key_src += count;
310 /* copy key hashes */ 289 /* copy key hashes */
@@ -315,40 +294,6 @@ ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct
315 count_src = (struct IBF_Count *) key_hash_src; 294 count_src = (struct IBF_Count *) key_hash_src;
316 memcpy (ibf->count + start, count_src, count * sizeof *count_src); 295 memcpy (ibf->count + start, count_src, count * sizeof *count_src);
317 count_src += count; 296 count_src += count;
318 /* returned buffer is at the end of written data*/
319 *buf = (void *) count_src;
320 return GNUNET_OK;
321}
322
323
324/**
325 * Write an ibf.
326 *
327 * @param ibf the ibf to write
328 * @param buf buffer to write the data to, will be updated to point to the
329 * first byte after the written data
330 * @param size pointer to the size of the buffer, will be updated, can be NULL
331 */
332void
333ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size)
334{
335 ibf_write_slice (ibf, 0, ibf->size, buf, size);
336}
337
338
339/**
340 * Read an ibf.
341 *
342 * @param buf pointer to the buffer to write to, will point to first
343 * byte after the written data
344 * @param size size of the buffer, will be updated
345 * @param dst ibf to write buckets to
346 * @return GNUNET_OK on success
347 */
348int
349ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst)
350{
351 return ibf_read_slice (buf, size, 0, dst->size, dst);
352} 297}
353 298
354 299
@@ -366,7 +311,6 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi
366 311
367 GNUNET_assert (ibf1->size == ibf2->size); 312 GNUNET_assert (ibf1->size == ibf2->size);
368 GNUNET_assert (ibf1->hash_num == ibf2->hash_num); 313 GNUNET_assert (ibf1->hash_num == ibf2->hash_num);
369 GNUNET_assert (ibf1->salt == ibf2->salt);
370 314
371 for (i = 0; i < ibf1->size; i++) 315 for (i = 0; i < ibf1->size; i++)
372 { 316 {
@@ -388,7 +332,6 @@ ibf_dup (const struct InvertibleBloomFilter *ibf)
388 struct InvertibleBloomFilter *copy; 332 struct InvertibleBloomFilter *copy;
389 copy = GNUNET_malloc (sizeof *copy); 333 copy = GNUNET_malloc (sizeof *copy);
390 copy->hash_num = ibf->hash_num; 334 copy->hash_num = ibf->hash_num;
391 copy->salt = ibf->salt;
392 copy->size = ibf->size; 335 copy->size = ibf->size;
393 copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash)); 336 copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash));
394 copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key)); 337 copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key));
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h
index 609653889..2bf3ef7c7 100644
--- a/src/consensus/ibf.h
+++ b/src/consensus/ibf.h
@@ -81,11 +81,6 @@ struct InvertibleBloomFilter
81 uint8_t hash_num; 81 uint8_t hash_num;
82 82
83 /** 83 /**
84 * Salt for mingling hashes
85 */
86 uint32_t salt;
87
88 /**
89 * Xor sums of the elements' keys, used to identify the elements. 84 * Xor sums of the elements' keys, used to identify the elements.
90 * Array of 'size' elements. 85 * Array of 'size' elements.
91 */ 86 */
@@ -107,58 +102,28 @@ struct InvertibleBloomFilter
107 102
108 103
109/** 104/**
110 * Write an ibf. 105 * Write buckets from an ibf to a buffer.
106 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
111 * 107 *
112 * @param ibf the ibf to write 108 * @param ibf the ibf to write
113 * @param start with which bucket to start 109 * @param start with which bucket to start
114 * @param count how many buckets to write 110 * @param count how many buckets to write
115 * @param buf buffer to write the data to, will be updated to point to the 111 * @param buf buffer to write the data to
116 * first byte after the written data
117 * @param size pointer to the size of the buffer, will be updated, can be NULL
118 */ 112 */
119void 113void
120ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size); 114ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf);
121 115
122 116
123/** 117/**
124 * Read an ibf. 118 * Read buckets from a buffer into an ibf.
125 * 119 *
126 * @param buf pointer to the buffer to write to, will point to first 120 * @param buf pointer to the buffer to read from
127 * byte after the written data
128 * @param size size of the buffer, will be updated
129 * @param start which bucket to start at 121 * @param start which bucket to start at
130 * @param count how many buckets to read 122 * @param count how many buckets to read
131 * @param ibf the ibf to read from 123 * @param ibf the ibf to read from
132 * @return GNUNET_OK on success
133 */
134int
135ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *dst);
136
137
138/**
139 * Write an ibf.
140 *
141 * @param ibf the ibf to write
142 * @param buf buffer to write the data to, will be updated to point to the
143 * first byte after the written data
144 * @param size pointer to the size of the buffer, will be updated, can be NULL
145 */ 124 */
146void 125void
147ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size); 126ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf);
148
149
150
151/**
152 * Read an ibf.
153 *
154 * @param buf pointer to the buffer to write to, will point to first
155 * byte after the written data
156 * @param size size of the buffer, will be updated
157 * @param dst ibf to write buckets to
158 * @return GNUNET_OK on success
159 */
160int
161ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst);
162 127
163 128
164/** 129/**
@@ -187,12 +152,10 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst);
187 * 152 *
188 * @param size number of IBF buckets 153 * @param size number of IBF buckets
189 * @param hash_num number of buckets one element is hashed in, usually 3 or 4 154 * @param hash_num number of buckets one element is hashed in, usually 3 or 4
190 * @param salt salt for mingling hashes, different salt may
191 * result in less (or more) collisions
192 * @return the newly created invertible bloom filter 155 * @return the newly created invertible bloom filter
193 */ 156 */
194struct InvertibleBloomFilter * 157struct InvertibleBloomFilter *
195ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt); 158ibf_create (uint32_t size, uint8_t hash_num);
196 159
197 160
198/** 161/**
diff --git a/src/consensus/strata_estimator.c b/src/consensus/strata_estimator.c
new file mode 100644
index 000000000..685c50f0f
--- /dev/null
+++ b/src/consensus/strata_estimator.c
@@ -0,0 +1,145 @@
1/*
2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file consensus/ibf.h
23 * @brief invertible bloom filter
24 * @author Florian Dold
25 */
26
27#include "platform.h"
28#include "gnunet_common.h"
29#include "ibf.h"
30#include "strata_estimator.h"
31
32void
33strata_estimator_write (const struct StrataEstimator *se, void *buf)
34{
35 int i;
36 for (i = 0; i < se->strata_count; i++)
37 {
38 ibf_write_slice (se->strata[i], 0, se->ibf_size, buf);
39 buf += se->ibf_size * IBF_BUCKET_SIZE;
40 }
41}
42
43void
44strata_estimator_read (const void *buf, struct StrataEstimator *se)
45{
46 int i;
47 for (i = 0; i < se->strata_count; i++)
48 {
49 ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]);
50 buf += se->ibf_size * IBF_BUCKET_SIZE;
51 }
52}
53
54
55void
56strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key)
57{
58 uint32_t v;
59 int i;
60 v = key->bits[0];
61 /* count trailing '1'-bits of v */
62 for (i = 0; v & 1; v>>=1, i++)
63 /* empty */;
64 ibf_insert (se->strata[i], ibf_key_from_hashcode (key));
65}
66
67
68
69struct StrataEstimator *
70strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
71{
72 struct StrataEstimator *se;
73 int i;
74
75 /* fixme: allocate everything in one chunk */
76
77 se = GNUNET_malloc (sizeof (struct StrataEstimator));
78 se->strata_count = strata_count;
79 se->ibf_size = ibf_size;
80 se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter *) * strata_count);
81 for (i = 0; i < strata_count; i++)
82 se->strata[i] = ibf_create (ibf_size, ibf_hashnum);
83 return se;
84}
85
86
87/**
88 * Estimate set difference with two strata estimators,
89 * i.e. arrays of IBFs.
90 * Does not not modify its arguments.
91 *
92 * @param se1 first strata estimator
93 * @param se2 second strata estimator
94 * @return the estimated difference
95 */
96unsigned int
97strata_estimator_difference (const struct StrataEstimator *se1,
98 const struct StrataEstimator *se2)
99{
100 int i;
101 int count;
102
103 GNUNET_assert (se1->strata_count == se2->strata_count);
104 count = 0;
105 for (i = se1->strata_count - 1; i >= 0; i--)
106 {
107 struct InvertibleBloomFilter *diff;
108 /* number of keys decoded from the ibf */
109 int ibf_count;
110 int more;
111 ibf_count = 0;
112 /* FIXME: implement this without always allocating new IBFs */
113 diff = ibf_dup (se1->strata[i]);
114 ibf_subtract (diff, se2->strata[i]);
115 for (;;)
116 {
117 more = ibf_decode (diff, NULL, NULL);
118 if (GNUNET_NO == more)
119 {
120 count += ibf_count;
121 break;
122 }
123 if (GNUNET_SYSERR == more)
124 {
125 ibf_destroy (diff);
126 return count * (1 << (i + 1));
127 }
128 ibf_count++;
129 }
130 ibf_destroy (diff);
131 }
132 return count;
133}
134
135
136void
137strata_estimator_destroy (struct StrataEstimator *se)
138{
139 int i;
140 for (i = 0; i < se->strata_count; i++)
141 ibf_destroy (se->strata[i]);
142 GNUNET_free (se->strata);
143 GNUNET_free (se);
144}
145
diff --git a/src/consensus/consensus_flout.h b/src/consensus/strata_estimator.h
index 6c97813a5..cb5bd3d0a 100644
--- a/src/consensus/consensus_flout.h
+++ b/src/consensus/strata_estimator.h
@@ -16,16 +16,20 @@
16 along with GNUnet; see the file COPYING. If not, write to the 16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19 */ 19*/
20 20
21/** 21/**
22 * @file consensus/consensus_flout.h 22 * @file consensus/strata_estimator.h
23 * @brief intentionally misbehave in certain ways for testing 23 * @brief estimator of set difference
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
26 26
27#ifndef GNUNET_CONSENSUS_FLOUT_H 27#ifndef GNUNET_CONSENSUS_STRATA_ESTIMATOR_H
28#define GNUNET_CONSENSUS_FLOUT_H 28#define GNUNET_CONSENSUS_STRATA_ESTIMATOR_H
29
30#include "platform.h"
31#include "gnunet_common.h"
32#include "gnunet_util_lib.h"
29 33
30#ifdef __cplusplus 34#ifdef __cplusplus
31extern "C" 35extern "C"
@@ -35,19 +39,38 @@ extern "C"
35#endif 39#endif
36#endif 40#endif
37 41
38#include "platform.h" 42
39#include "gnunet_common.h" 43struct StrataEstimator
40#include "gnunet_consensus_service.h" 44{
45 struct InvertibleBloomFilter **strata;
46 unsigned int strata_count;
47 unsigned int ibf_size;
48};
49
41 50
42void 51void
43GNUNET_CONSENSUS_flout_disable_peer (struct GNUNET_CONSENSUS_Handle *consensus); 52strata_estimator_write (const struct StrataEstimator *se, void *buf);
53
44 54
45void 55void
46GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_HashCode *element_hash); 56strata_estimator_read (const void *buf, struct StrataEstimator *se);
57
58
59struct StrataEstimator *
60strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum);
61
62
63unsigned int
64strata_estimator_difference (const struct StrataEstimator *se1,
65 const struct StrataEstimator *se2);
66
47 67
48void 68void
49GNUNET_CONSENSUS_flout_send_bogos_ibf (struct GNUNET_CONSENSUS_Handle *consensus, ...); 69strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key);
70
50 71
72void
73strata_estimator_destroy (struct StrataEstimator *se);
51 74
52 75
53#if 0 /* keep Emacsens' auto-indent happy */ 76#if 0 /* keep Emacsens' auto-indent happy */
@@ -58,3 +81,4 @@ GNUNET_CONSENSUS_flout_send_bogos_ibf (struct GNUNET_CONSENSUS_Handle *consensus
58#endif 81#endif
59 82
60#endif 83#endif
84
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index 89f109345..6dc37f7d9 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -1247,10 +1247,8 @@ insert_next_element (void *cls,
1247 * 1247 *
1248 * @param cls the 'struct DirectNeighbor' we're building the consensus with 1248 * @param cls the 'struct DirectNeighbor' we're building the consensus with
1249 * @param element the new element we have learned 1249 * @param element the new element we have learned
1250 * @return GNUNET_OK if the valid is well-formed and should be added to the consensus,
1251 * GNUNET_SYSERR if the element should be ignored and not be propagated
1252 */ 1250 */
1253static int 1251static void
1254learn_route_cb (void *cls, 1252learn_route_cb (void *cls,
1255 const struct GNUNET_CONSENSUS_Element *element) 1253 const struct GNUNET_CONSENSUS_Element *element)
1256{ 1254{
@@ -1274,12 +1272,12 @@ learn_route_cb (void *cls,
1274 neighbor->consensus_task = GNUNET_SCHEDULER_add_delayed (GNUNET_DV_CONSENSUS_FREQUENCY, 1272 neighbor->consensus_task = GNUNET_SCHEDULER_add_delayed (GNUNET_DV_CONSENSUS_FREQUENCY,
1275 &start_consensus, 1273 &start_consensus,
1276 neighbor); 1274 neighbor);
1277 return GNUNET_SYSERR; 1275 return;
1278 } 1276 }
1279 if (sizeof (struct Target) != element->size) 1277 if (sizeof (struct Target) != element->size)
1280 { 1278 {
1281 GNUNET_break_op (0); 1279 GNUNET_break_op (0);
1282 return GNUNET_SYSERR; 1280 return;
1283 } 1281 }
1284 target = GNUNET_malloc (sizeof (struct Target)); 1282 target = GNUNET_malloc (sizeof (struct Target));
1285 memcpy (target, element->data, sizeof (struct Target)); 1283 memcpy (target, element->data, sizeof (struct Target));
@@ -1291,9 +1289,8 @@ learn_route_cb (void *cls,
1291 { 1289 {
1292 GNUNET_break_op (0); 1290 GNUNET_break_op (0);
1293 GNUNET_free (target); 1291 GNUNET_free (target);
1294 return GNUNET_SYSERR; 1292 return;
1295 } 1293 }
1296 return GNUNET_OK;
1297} 1294}
1298 1295
1299 1296
diff --git a/src/include/gnunet_consensus_service.h b/src/include/gnunet_consensus_service.h
index 0c74a6a27..f7f784f6e 100644
--- a/src/include/gnunet_consensus_service.h
+++ b/src/include/gnunet_consensus_service.h
@@ -71,10 +71,8 @@ struct GNUNET_CONSENSUS_Element
71 * 71 *
72 * @param cls closure 72 * @param cls closure
73 * @param element new element, NULL on error 73 * @param element new element, NULL on error
74 * @return GNUNET_OK if the valid is well-formed and should be added to the consensus,
75 * GNUNET_SYSERR if the element should be ignored and not be propagated
76 */ 74 */
77typedef int (*GNUNET_CONSENSUS_ElementCallback) (void *cls, 75typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls,
78 const struct GNUNET_CONSENSUS_Element *element); 76 const struct GNUNET_CONSENSUS_Element *element);
79 77
80 78
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 7179914af..431542660 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1754,6 +1754,11 @@ extern "C"
1754 */ 1754 */
1755#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN 548 1755#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN 548
1756 1756
1757/**
1758 * Abort a round, don't send requested elements anymore
1759 */
1760#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548
1761
1757 1762
1758/** 1763/**
1759 * Next available: 570 1764 * Next available: 570