diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-02-07 10:51:17 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-02-07 10:51:17 +0000 |
commit | 4dc3faf5e88b8ca602602aa28a6ff76c02d34848 (patch) | |
tree | d50989d9fe71816bba1fdcef690684daa4f7873b /src/consensus | |
parent | 5ceb669384bd36b3213f7f1e7dbaf31b913e06a0 (diff) | |
download | gnunet-4dc3faf5e88b8ca602602aa28a6ff76c02d34848.tar.gz gnunet-4dc3faf5e88b8ca602602aa28a6ff76c02d34848.zip |
- improved ibf decoding algorithm
- strata estimator now fits in one message
- work on consensus, not quite working yet
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/consensus_protocol.h | 7 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-ibf.c | 65 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 546 | ||||
-rw-r--r-- | src/consensus/ibf.c | 185 | ||||
-rw-r--r-- | src/consensus/ibf.h | 39 |
5 files changed, 542 insertions, 300 deletions
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index c84aad263..0da959f8f 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h | |||
@@ -60,6 +60,13 @@ struct Element | |||
60 | struct GNUNET_HashCode hash; | 60 | struct GNUNET_HashCode hash; |
61 | }; | 61 | }; |
62 | 62 | ||
63 | |||
64 | struct ElementRequest | ||
65 | { | ||
66 | struct GNUNET_MessageHeader header; | ||
67 | /* struct GNUNET_HashCode[] rest */ | ||
68 | }; | ||
69 | |||
63 | struct ConsensusHello | 70 | struct ConsensusHello |
64 | { | 71 | { |
65 | struct GNUNET_MessageHeader header; | 72 | struct GNUNET_MessageHeader header; |
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c index c9bcc1ab0..a16ca3247 100644 --- a/src/consensus/gnunet-consensus-ibf.c +++ b/src/consensus/gnunet-consensus-ibf.c | |||
@@ -38,16 +38,39 @@ static unsigned int csize = 10; | |||
38 | static unsigned int hash_num = 3; | 38 | static unsigned int hash_num = 3; |
39 | static unsigned int ibf_size = 80; | 39 | static unsigned int ibf_size = 80; |
40 | 40 | ||
41 | /* FIXME: add parameter for this */ | ||
42 | static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK; | ||
41 | 43 | ||
42 | static struct GNUNET_CONTAINER_MultiHashMap *set_a; | 44 | static struct GNUNET_CONTAINER_MultiHashMap *set_a; |
43 | static struct GNUNET_CONTAINER_MultiHashMap *set_b; | 45 | static struct GNUNET_CONTAINER_MultiHashMap *set_b; |
44 | /* common elements in a and b */ | 46 | /* common elements in a and b */ |
45 | static struct GNUNET_CONTAINER_MultiHashMap *set_c; | 47 | static struct GNUNET_CONTAINER_MultiHashMap *set_c; |
46 | 48 | ||
49 | static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode; | ||
50 | |||
47 | static struct InvertibleBloomFilter *ibf_a; | 51 | static struct InvertibleBloomFilter *ibf_a; |
48 | static struct InvertibleBloomFilter *ibf_b; | 52 | static struct InvertibleBloomFilter *ibf_b; |
49 | 53 | ||
50 | 54 | ||
55 | static void | ||
56 | register_hashcode (struct GNUNET_HashCode *hash) | ||
57 | { | ||
58 | struct GNUNET_HashCode replicated; | ||
59 | uint64_t key; | ||
60 | key = ibf_key_from_hashcode (hash); | ||
61 | ibf_hashcode_from_key (key, &replicated); | ||
62 | GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash), | ||
63 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
64 | } | ||
65 | |||
66 | static void | ||
67 | iter_hashcodes (uint64_t key, GNUNET_CONTAINER_HashMapIterator iter, void *cls) | ||
68 | { | ||
69 | struct GNUNET_HashCode replicated; | ||
70 | ibf_hashcode_from_key (key, &replicated); | ||
71 | GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode, &replicated, iter, cls); | ||
72 | } | ||
73 | |||
51 | 74 | ||
52 | static int | 75 | static int |
53 | insert_iterator (void *cls, | 76 | insert_iterator (void *cls, |
@@ -55,7 +78,19 @@ insert_iterator (void *cls, | |||
55 | void *value) | 78 | void *value) |
56 | { | 79 | { |
57 | struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls; | 80 | struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls; |
58 | ibf_insert (ibf, key); | 81 | ibf_insert (ibf, ibf_key_from_hashcode (key)); |
82 | return GNUNET_YES; | ||
83 | } | ||
84 | |||
85 | |||
86 | static int | ||
87 | remove_iterator (void *cls, | ||
88 | const struct GNUNET_HashCode *key, | ||
89 | void *value) | ||
90 | { | ||
91 | struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls; | ||
92 | /* if remove fails, there just was a collision with another key */ | ||
93 | (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL); | ||
59 | return GNUNET_YES; | 94 | return GNUNET_YES; |
60 | } | 95 | } |
61 | 96 | ||
@@ -65,6 +100,7 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
65 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 100 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
66 | { | 101 | { |
67 | struct GNUNET_HashCode id; | 102 | struct GNUNET_HashCode id; |
103 | uint64_t ibf_key; | ||
68 | int i; | 104 | int i; |
69 | int side; | 105 | int side; |
70 | int res; | 106 | int res; |
@@ -78,47 +114,57 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
78 | set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize), | 114 | set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize), |
79 | GNUNET_NO); | 115 | GNUNET_NO); |
80 | 116 | ||
117 | key_to_hashcode = GNUNET_CONTAINER_multihashmap_create (((asize+bsize+csize == 0) ? 1 : (asize+bsize+csize)), | ||
118 | GNUNET_NO); | ||
119 | |||
81 | printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n", | 120 | printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n", |
82 | hash_num, ibf_size, asize, bsize, csize); | 121 | hash_num, ibf_size, asize, bsize, csize); |
83 | 122 | ||
84 | i = 0; | 123 | i = 0; |
85 | while (i < asize) | 124 | while (i < asize) |
86 | { | 125 | { |
87 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &id); | 126 | GNUNET_CRYPTO_hash_create_random (random_quality, &id); |
88 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) | 127 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) |
89 | continue; | 128 | continue; |
90 | GNUNET_CONTAINER_multihashmap_put ( | 129 | GNUNET_CONTAINER_multihashmap_put ( |
91 | set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 130 | set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); |
131 | register_hashcode (&id); | ||
92 | i++; | 132 | i++; |
93 | } | 133 | } |
94 | i = 0; | 134 | i = 0; |
95 | while (i < bsize) | 135 | while (i < bsize) |
96 | { | 136 | { |
97 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &id); | 137 | GNUNET_CRYPTO_hash_create_random (random_quality, &id); |
98 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) | 138 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) |
99 | continue; | 139 | continue; |
100 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id)) | 140 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id)) |
101 | continue; | 141 | continue; |
102 | GNUNET_CONTAINER_multihashmap_put ( | 142 | GNUNET_CONTAINER_multihashmap_put ( |
103 | set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 143 | set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); |
144 | register_hashcode (&id); | ||
104 | i++; | 145 | i++; |
105 | } | 146 | } |
106 | i = 0; | 147 | i = 0; |
107 | while (i < csize) | 148 | while (i < csize) |
108 | { | 149 | { |
109 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &id); | 150 | GNUNET_CRYPTO_hash_create_random (random_quality, &id); |
110 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) | 151 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) |
111 | continue; | 152 | continue; |
112 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id)) | 153 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id)) |
113 | continue; | 154 | continue; |
155 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id)) | ||
156 | continue; | ||
114 | GNUNET_CONTAINER_multihashmap_put ( | 157 | GNUNET_CONTAINER_multihashmap_put ( |
115 | set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 158 | set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); |
159 | register_hashcode (&id); | ||
116 | i++; | 160 | i++; |
117 | } | 161 | } |
118 | 162 | ||
119 | ibf_a = ibf_create (ibf_size, hash_num, 0); | 163 | ibf_a = ibf_create (ibf_size, hash_num, 0); |
120 | ibf_b = ibf_create (ibf_size, hash_num, 0); | 164 | ibf_b = ibf_create (ibf_size, hash_num, 0); |
121 | 165 | ||
166 | printf ("generated sets\n"); | ||
167 | |||
122 | start_time = GNUNET_TIME_absolute_get (); | 168 | start_time = GNUNET_TIME_absolute_get (); |
123 | 169 | ||
124 | GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a); | 170 | GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a); |
@@ -137,7 +183,7 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
137 | 183 | ||
138 | for (;;) | 184 | for (;;) |
139 | { | 185 | { |
140 | res = ibf_decode (ibf_a, &side, &id); | 186 | res = ibf_decode (ibf_a, &side, &ibf_key); |
141 | if (GNUNET_SYSERR == res) | 187 | if (GNUNET_SYSERR == res) |
142 | { | 188 | { |
143 | printf ("decode failed\n"); | 189 | printf ("decode failed\n"); |
@@ -157,14 +203,9 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
157 | } | 203 | } |
158 | 204 | ||
159 | if (side == 1) | 205 | if (side == 1) |
160 | res = GNUNET_CONTAINER_multihashmap_remove (set_a, &id, NULL); | 206 | iter_hashcodes (ibf_key, remove_iterator, set_a); |
161 | if (side == -1) | 207 | if (side == -1) |
162 | res = GNUNET_CONTAINER_multihashmap_remove (set_b, &id, NULL); | 208 | iter_hashcodes (ibf_key, remove_iterator, set_b); |
163 | if (GNUNET_YES != res) | ||
164 | { | ||
165 | printf ("decoded wrong element\n"); | ||
166 | return; | ||
167 | } | ||
168 | } | 209 | } |
169 | } | 210 | } |
170 | 211 | ||
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 1cbb9d021..d223360dc 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -18,7 +18,6 @@ | |||
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | |||
22 | /** | 21 | /** |
23 | * @file consensus/gnunet-service-consensus.c | 22 | * @file consensus/gnunet-service-consensus.c |
24 | * @brief | 23 | * @brief |
@@ -47,17 +46,21 @@ | |||
47 | */ | 46 | */ |
48 | #define STRATA_IBF_BUCKETS 80 | 47 | #define STRATA_IBF_BUCKETS 80 |
49 | /** | 48 | /** |
50 | * hash num parameter of the IBF | 49 | * hash num parameter for the difference digests and strata estimators |
51 | */ | 50 | */ |
52 | #define STRATA_HASH_NUM 3 | 51 | #define STRATA_HASH_NUM 3 |
52 | |||
53 | /** | 53 | /** |
54 | * Number of strata that can be transmitted in one message. | 54 | * Number of buckets that can be transmitted in one message. |
55 | */ | 55 | */ |
56 | #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS)) | ||
57 | |||
58 | #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) | 56 | #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) |
59 | 57 | ||
60 | #define MAX_IBF_ORDER (64) | 58 | /** |
59 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | ||
60 | * Choose this value so that computing the IBF is still cheaper | ||
61 | * than transmitting all values. | ||
62 | */ | ||
63 | #define MAX_IBF_ORDER (32) | ||
61 | 64 | ||
62 | 65 | ||
63 | /* forward declarations */ | 66 | /* forward declarations */ |
@@ -76,14 +79,14 @@ static void | |||
76 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); | 79 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); |
77 | 80 | ||
78 | static void | 81 | static void |
79 | write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size); | 82 | write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size); |
80 | 83 | ||
81 | static int | 84 | static int |
82 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); | 85 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); |
83 | 86 | ||
84 | 87 | ||
85 | /** | 88 | /** |
86 | * An element that is waiting to be transmitted to a client. | 89 | * An element that is waiting to be transmitted. |
87 | */ | 90 | */ |
88 | struct PendingElement | 91 | struct PendingElement |
89 | { | 92 | { |
@@ -106,6 +109,9 @@ struct PendingElement | |||
106 | struct ConsensusPeerInformation *cpi; | 109 | struct ConsensusPeerInformation *cpi; |
107 | }; | 110 | }; |
108 | 111 | ||
112 | /** | ||
113 | * Information about a peer that is in a consensus session. | ||
114 | */ | ||
109 | struct ConsensusPeerInformation | 115 | struct ConsensusPeerInformation |
110 | { | 116 | { |
111 | struct GNUNET_STREAM_Socket *socket; | 117 | struct GNUNET_STREAM_Socket *socket; |
@@ -123,6 +129,12 @@ struct ConsensusPeerInformation | |||
123 | int is_outgoing; | 129 | int is_outgoing; |
124 | 130 | ||
125 | /** | 131 | /** |
132 | * if the peer did something wrong, and was disconnected, | ||
133 | * never interact with this peer again. | ||
134 | */ | ||
135 | int is_bad; | ||
136 | |||
137 | /** | ||
126 | * Did we receive/send a consensus hello? | 138 | * Did we receive/send a consensus hello? |
127 | */ | 139 | */ |
128 | int hello; | 140 | int hello; |
@@ -137,27 +149,29 @@ struct ConsensusPeerInformation | |||
137 | */ | 149 | */ |
138 | struct GNUNET_STREAM_WriteHandle *wh; | 150 | struct GNUNET_STREAM_WriteHandle *wh; |
139 | 151 | ||
152 | enum { | ||
153 | IBF_STATE_NONE, | ||
154 | IBF_STATE_RECEIVING, | ||
155 | IBF_STATE_TRANSMITTING, | ||
156 | IBF_STATE_DECODING | ||
157 | } ibf_state ; | ||
158 | |||
140 | /** | 159 | /** |
141 | * How many of the strate in the ibf were | 160 | * What is the order (=log2 size) of the ibf |
142 | * sent or received in this round? | 161 | * we're currently dealing with? |
143 | */ | 162 | */ |
144 | int strata_counter; | ||
145 | |||
146 | int ibf_order; | 163 | int ibf_order; |
147 | 164 | ||
148 | struct InvertibleBloomFilter *outgoing_ibf; | 165 | /** |
149 | 166 | * The current IBF for this peer, | |
150 | int outgoing_bucket_counter; | 167 | * purpose dependent on ibf_state |
151 | 168 | */ | |
152 | struct InvertibleBloomFilter *incoming_ibf; | 169 | struct InvertibleBloomFilter *ibf; |
153 | |||
154 | int incoming_bucket_counter; | ||
155 | 170 | ||
156 | /** | 171 | /** |
157 | * NULL or incoming_ibf - outgoing_ibf. | 172 | * How many buckets have we transmitted/received (depending on state)? |
158 | * Decoded values of side '1' are to be requested from the the peer. | ||
159 | */ | 173 | */ |
160 | struct InvertibleBloomFilter *diff_ibf; | 174 | int ibf_bucket_counter; |
161 | 175 | ||
162 | /** | 176 | /** |
163 | * Strata estimator of the peer, NULL if our peer | 177 | * Strata estimator of the peer, NULL if our peer |
@@ -165,11 +179,21 @@ struct ConsensusPeerInformation | |||
165 | */ | 179 | */ |
166 | struct InvertibleBloomFilter **strata; | 180 | struct InvertibleBloomFilter **strata; |
167 | 181 | ||
182 | /** | ||
183 | * difference estimated with the current strata estimator | ||
184 | */ | ||
168 | unsigned int diff; | 185 | unsigned int diff; |
169 | 186 | ||
170 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | 187 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; |
171 | 188 | ||
189 | /** | ||
190 | * Back-reference to the consensus session, | ||
191 | * to that ConsensusPeerInformation can be used as a closure | ||
192 | */ | ||
172 | struct ConsensusSession *session; | 193 | struct ConsensusSession *session; |
194 | |||
195 | struct PendingElement *send_pending_head; | ||
196 | struct PendingElement *send_pending_tail; | ||
173 | }; | 197 | }; |
174 | 198 | ||
175 | struct QueuedMessage | 199 | struct QueuedMessage |
@@ -187,9 +211,31 @@ struct QueuedMessage | |||
187 | struct QueuedMessage *prev; | 211 | struct QueuedMessage *prev; |
188 | }; | 212 | }; |
189 | 213 | ||
214 | enum ConsensusRound | ||
215 | { | ||
216 | /** | ||
217 | * distribution of information with the exponential scheme | ||
218 | */ | ||
219 | CONSENSUS_ROUND_EXP_EXCHANGE, | ||
220 | /** | ||
221 | * All-to-all, exchange missing values | ||
222 | */ | ||
223 | CONSENSUS_ROUND_A2A_EXCHANGE, | ||
224 | /** | ||
225 | * All-to-all, check what values are missing, don't exchange anything | ||
226 | */ | ||
227 | CONSENSUS_ROUND_A2A_INVENTORY | ||
228 | |||
229 | /* | ||
230 | a round to exchange the information for fraud-detection | ||
231 | CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT | ||
232 | */ | ||
233 | }; | ||
234 | |||
190 | 235 | ||
191 | /** | 236 | /** |
192 | * A consensus session consists of one local client and the remote authorities. | 237 | * A consensus session consists of one local client and the remote authorities. |
238 | * | ||
193 | */ | 239 | */ |
194 | struct ConsensusSession | 240 | struct ConsensusSession |
195 | { | 241 | { |
@@ -225,6 +271,7 @@ struct ConsensusSession | |||
225 | /** | 271 | /** |
226 | * Values in the consensus set of this session, | 272 | * Values in the consensus set of this session, |
227 | * all of them either have been sent by or approved by the client. | 273 | * all of them either have been sent by or approved by the client. |
274 | * Contains GNUNET_CONSENSUS_Element. | ||
228 | */ | 275 | */ |
229 | struct GNUNET_CONTAINER_MultiHashMap *values; | 276 | struct GNUNET_CONTAINER_MultiHashMap *values; |
230 | 277 | ||
@@ -238,8 +285,14 @@ struct ConsensusSession | |||
238 | */ | 285 | */ |
239 | struct PendingElement *approval_pending_tail; | 286 | struct PendingElement *approval_pending_tail; |
240 | 287 | ||
288 | /** | ||
289 | * Messages to be sent to the local client that owns this session | ||
290 | */ | ||
241 | struct QueuedMessage *client_messages_head; | 291 | struct QueuedMessage *client_messages_head; |
242 | 292 | ||
293 | /** | ||
294 | * Messages to be sent to the local client that owns this session | ||
295 | */ | ||
243 | struct QueuedMessage *client_messages_tail; | 296 | struct QueuedMessage *client_messages_tail; |
244 | 297 | ||
245 | /** | 298 | /** |
@@ -259,22 +312,14 @@ struct ConsensusSession | |||
259 | int conclude_group_min; | 312 | int conclude_group_min; |
260 | 313 | ||
261 | /** | 314 | /** |
262 | * Current round of the conclusion | ||
263 | */ | ||
264 | int current_round; | ||
265 | |||
266 | /** | ||
267 | * Soft deadline for conclude. | ||
268 | * Speed up the speed of the consensus at the cost of consensus quality, as | ||
269 | * the time approached or crosses the deadline. | ||
270 | */ | ||
271 | struct GNUNET_TIME_Absolute conclude_deadline; | ||
272 | |||
273 | /** | ||
274 | * Number of other peers in the consensus | 315 | * Number of other peers in the consensus |
275 | */ | 316 | */ |
276 | unsigned int num_peers; | 317 | unsigned int num_peers; |
277 | 318 | ||
319 | /** | ||
320 | * Information about the other peers, | ||
321 | * their state, etc. | ||
322 | */ | ||
278 | struct ConsensusPeerInformation *info; | 323 | struct ConsensusPeerInformation *info; |
279 | 324 | ||
280 | /** | 325 | /** |
@@ -289,13 +334,19 @@ struct ConsensusSession | |||
289 | int local_peer_idx; | 334 | int local_peer_idx; |
290 | 335 | ||
291 | /** | 336 | /** |
292 | * Task identifier for the round timeout task | 337 | * Strata estimator, computed online |
293 | */ | 338 | */ |
294 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; | ||
295 | |||
296 | struct InvertibleBloomFilter **strata; | 339 | struct InvertibleBloomFilter **strata; |
297 | 340 | ||
341 | /** | ||
342 | * Pre-computed IBFs | ||
343 | */ | ||
298 | struct InvertibleBloomFilter **ibfs; | 344 | struct InvertibleBloomFilter **ibfs; |
345 | |||
346 | /** | ||
347 | * Current round | ||
348 | */ | ||
349 | enum ConsensusRound current_round; | ||
299 | }; | 350 | }; |
300 | 351 | ||
301 | 352 | ||
@@ -339,6 +390,14 @@ struct IncomingSocket | |||
339 | * Peer-in-session this socket belongs to, once known, otherwise NULL. | 390 | * Peer-in-session this socket belongs to, once known, otherwise NULL. |
340 | */ | 391 | */ |
341 | struct ConsensusPeerInformation *cpi; | 392 | struct ConsensusPeerInformation *cpi; |
393 | |||
394 | /** | ||
395 | * Set to the global session id, if the peer sent us a hello-message, | ||
396 | * but the session does not exist yet. | ||
397 | * | ||
398 | * FIXME: not implemented yet | ||
399 | */ | ||
400 | struct GNUNET_HashCode *requested_gid; | ||
342 | }; | 401 | }; |
343 | 402 | ||
344 | static struct IncomingSocket *incoming_sockets_head; | 403 | static struct IncomingSocket *incoming_sockets_head; |
@@ -380,6 +439,12 @@ static struct GNUNET_CORE_Handle *core; | |||
380 | static struct GNUNET_STREAM_ListenSocket *listener; | 439 | static struct GNUNET_STREAM_ListenSocket *listener; |
381 | 440 | ||
382 | 441 | ||
442 | /** | ||
443 | * Queue a message to be sent to the inhabiting client of a sessino | ||
444 | * | ||
445 | * @param session session | ||
446 | * @param msg message we want to queue | ||
447 | */ | ||
383 | static void | 448 | static void |
384 | queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) | 449 | queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) |
385 | { | 450 | { |
@@ -389,7 +454,32 @@ queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHea | |||
389 | GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); | 454 | GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); |
390 | } | 455 | } |
391 | 456 | ||
457 | /** | ||
458 | * Get peer index associated with the peer information, | ||
459 | * unique for every session among all peers. | ||
460 | */ | ||
461 | static int | ||
462 | get_cpi_index (struct ConsensusPeerInformation *cpi) | ||
463 | { | ||
464 | return cpi - cpi->session->info; | ||
465 | } | ||
466 | |||
467 | /** | ||
468 | * Mark the peer as bad, free as state we don't need anymore. | ||
469 | */ | ||
470 | static void | ||
471 | mark_peer_bad (struct ConsensusPeerInformation *cpi) | ||
472 | { | ||
473 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi)); | ||
474 | cpi->is_bad = GNUNET_YES; | ||
475 | /* FIXME: free ibfs etc. */ | ||
476 | } | ||
477 | |||
392 | 478 | ||
479 | /** | ||
480 | * Estimate set difference with two strata estimators, | ||
481 | * i.e. arrays of IBFs. | ||
482 | */ | ||
393 | static int | 483 | static int |
394 | estimate_difference (struct InvertibleBloomFilter** strata1, | 484 | estimate_difference (struct InvertibleBloomFilter** strata1, |
395 | struct InvertibleBloomFilter** strata2) | 485 | struct InvertibleBloomFilter** strata2) |
@@ -415,6 +505,7 @@ estimate_difference (struct InvertibleBloomFilter** strata1, | |||
415 | } | 505 | } |
416 | if (GNUNET_SYSERR == more) | 506 | if (GNUNET_SYSERR == more) |
417 | { | 507 | { |
508 | ibf_destroy (diff); | ||
418 | return count * (1 << (i + 1)); | 509 | return count * (1 << (i + 1)); |
419 | } | 510 | } |
420 | ibf_count++; | 511 | ibf_count++; |
@@ -425,10 +516,9 @@ estimate_difference (struct InvertibleBloomFilter** strata1, | |||
425 | } | 516 | } |
426 | 517 | ||
427 | 518 | ||
428 | |||
429 | /** | 519 | /** |
430 | * Functions of this signature are called whenever data is available from the | 520 | * Called when receiving data from a peer that is member of |
431 | * stream. | 521 | * an inhabited consensus session. |
432 | * | 522 | * |
433 | * @param cls the closure from GNUNET_STREAM_read | 523 | * @param cls the closure from GNUNET_STREAM_read |
434 | * @param status the status of the stream at the time this function is called | 524 | * @param status the status of the stream at the time this function is called |
@@ -468,8 +558,8 @@ session_stream_data_processor (void *cls, | |||
468 | } | 558 | } |
469 | 559 | ||
470 | /** | 560 | /** |
471 | * Functions of this signature are called whenever data is available from the | 561 | * Called when we receive data from a peer that is not member of |
472 | * stream. | 562 | * a session yet, or the session is not yet inhabited. |
473 | * | 563 | * |
474 | * @param cls the closure from GNUNET_STREAM_read | 564 | * @param cls the closure from GNUNET_STREAM_read |
475 | * @param status the status of the stream at the time this function is called | 565 | * @param status the status of the stream at the time this function is called |
@@ -508,7 +598,7 @@ incoming_stream_data_processor (void *cls, | |||
508 | 598 | ||
509 | 599 | ||
510 | /** | 600 | /** |
511 | * Iterator over hash map entries. | 601 | * Iterator to insert values into an ibf. |
512 | * | 602 | * |
513 | * @param cls closure | 603 | * @param cls closure |
514 | * @param key current key code | 604 | * @param key current key code |
@@ -524,28 +614,34 @@ ibf_values_iterator (void *cls, | |||
524 | { | 614 | { |
525 | struct ConsensusPeerInformation *cpi; | 615 | struct ConsensusPeerInformation *cpi; |
526 | cpi = cls; | 616 | cpi = cls; |
527 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], key); | 617 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key)); |
528 | return GNUNET_YES; | 618 | return GNUNET_YES; |
529 | } | 619 | } |
530 | 620 | ||
531 | |||
532 | static void | 621 | static void |
533 | create_outgoing_ibf (struct ConsensusPeerInformation *cpi) | 622 | prepare_ibf (struct ConsensusPeerInformation *cpi) |
534 | { | 623 | { |
535 | if (NULL == cpi->session->ibfs[cpi->ibf_order]) | 624 | if (NULL == cpi->session->ibfs[cpi->ibf_order]) |
536 | { | 625 | { |
537 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | 626 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); |
538 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); | 627 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); |
539 | } | 628 | } |
540 | cpi->outgoing_ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
541 | } | 629 | } |
542 | 630 | ||
631 | |||
632 | /** | ||
633 | * Called when a peer sends us its strata estimator. | ||
634 | * In response, we sent out IBF of appropriate size back. | ||
635 | * | ||
636 | * @param cpi session | ||
637 | * @param strata_msg message | ||
638 | */ | ||
543 | static int | 639 | static int |
544 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) | 640 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) |
545 | { | 641 | { |
546 | int i; | 642 | int i; |
547 | int num_strata; | 643 | uint64_t *key_src; |
548 | struct GNUNET_HashCode *hash_src; | 644 | uint32_t *hash_src; |
549 | uint8_t *count_src; | 645 | uint8_t *count_src; |
550 | 646 | ||
551 | GNUNET_assert (GNUNET_NO == cpi->is_outgoing); | 647 | GNUNET_assert (GNUNET_NO == cpi->is_outgoing); |
@@ -557,49 +653,46 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
557 | cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | 653 | cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); |
558 | } | 654 | } |
559 | 655 | ||
560 | num_strata = ntohs (strata_msg->num_strata); | ||
561 | |||
562 | /* for correct message alignment, copy bucket types seperately */ | 656 | /* for correct message alignment, copy bucket types seperately */ |
563 | hash_src = (struct GNUNET_HashCode *) &strata_msg[1]; | 657 | key_src = (uint64_t *) &strata_msg[1]; |
564 | 658 | ||
565 | for (i = 0; i < num_strata; i++) | 659 | for (i = 0; i < STRATA_COUNT; i++) |
566 | { | 660 | { |
567 | memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); | 661 | memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src); |
568 | hash_src += STRATA_IBF_BUCKETS; | 662 | key_src += STRATA_IBF_BUCKETS; |
569 | } | 663 | } |
570 | 664 | ||
571 | for (i = 0; i < num_strata; i++) | 665 | hash_src = (uint32_t *) key_src; |
666 | |||
667 | for (i = 0; i < STRATA_COUNT; i++) | ||
572 | { | 668 | { |
573 | memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); | 669 | memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); |
574 | hash_src += STRATA_IBF_BUCKETS; | 670 | hash_src += STRATA_IBF_BUCKETS; |
575 | } | 671 | } |
576 | 672 | ||
577 | count_src = (uint8_t *) hash_src; | 673 | count_src = (uint8_t *) hash_src; |
578 | 674 | ||
579 | for (i = 0; i < num_strata; i++) | 675 | for (i = 0; i < STRATA_COUNT; i++) |
580 | { | 676 | { |
581 | memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS); | 677 | memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS); |
582 | count_src += STRATA_IBF_BUCKETS; | 678 | count_src += STRATA_IBF_BUCKETS; |
583 | } | 679 | } |
584 | 680 | ||
585 | GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE)); | 681 | cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); |
586 | 682 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); | |
587 | cpi->strata_counter += num_strata; | 683 | |
588 | 684 | /* send IBF of the right size */ | |
589 | if (STRATA_COUNT == cpi->strata_counter) | 685 | cpi->ibf_order = 0; |
590 | { | 686 | while ((1 << cpi->ibf_order) < cpi->diff) |
591 | 687 | cpi->ibf_order++; | |
592 | cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); | 688 | if (cpi->ibf_order > MAX_IBF_ORDER) |
593 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); | 689 | cpi->ibf_order = MAX_IBF_ORDER; |
594 | cpi->ibf_order = 0; | 690 | cpi->ibf_order += 2; |
595 | while ((1 << cpi->ibf_order) < cpi->diff) | 691 | /* create ibf if not already pre-computed */ |
596 | cpi->ibf_order++; | 692 | prepare_ibf (cpi); |
597 | if (cpi->ibf_order > MAX_IBF_ORDER) | 693 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); |
598 | cpi->ibf_order = MAX_IBF_ORDER; | 694 | cpi->ibf_state = IBF_STATE_TRANSMITTING; |
599 | cpi->ibf_order += 2; | 695 | write_ibf (cpi, GNUNET_STREAM_OK, 0); |
600 | create_outgoing_ibf (cpi); | ||
601 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | ||
602 | } | ||
603 | 696 | ||
604 | return GNUNET_YES; | 697 | return GNUNET_YES; |
605 | } | 698 | } |
@@ -608,59 +701,57 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
608 | static int | 701 | static int |
609 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) | 702 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) |
610 | { | 703 | { |
611 | struct GNUNET_HashCode *hash_src; | ||
612 | int num_buckets; | 704 | int num_buckets; |
705 | uint64_t *key_src; | ||
706 | uint32_t *hash_src; | ||
613 | uint8_t *count_src; | 707 | uint8_t *count_src; |
614 | 708 | ||
615 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; | 709 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; |
616 | 710 | ||
617 | if (cpi->is_outgoing == GNUNET_YES) | 711 | if (IBF_STATE_NONE == cpi->ibf_state) |
618 | { | 712 | { |
619 | /* we receive the ibf as an initiator, thus we're interested in the order */ | 713 | cpi->ibf_state = IBF_STATE_RECEIVING; |
620 | cpi->ibf_order = digest->order; | 714 | cpi->ibf_order = digest->order; |
621 | if ((0 == cpi->outgoing_bucket_counter) && (NULL == cpi->wh)) | 715 | cpi->ibf_bucket_counter = 0; |
622 | { | ||
623 | create_outgoing_ibf (cpi); | ||
624 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | ||
625 | } | ||
626 | /* FIXME: ensure that orders do not differ each time */ | ||
627 | } | 716 | } |
628 | else | 717 | |
718 | if ( (IBF_STATE_RECEIVING != cpi->ibf_state) || | ||
719 | (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) ) | ||
629 | { | 720 | { |
630 | /* FIXME: handle correctly */ | 721 | mark_peer_bad (cpi); |
631 | GNUNET_assert (cpi->ibf_order == digest->order); | 722 | return GNUNET_NO; |
632 | } | 723 | } |
633 | 724 | ||
634 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->incoming_bucket_counter, (1 << cpi->ibf_order)); | ||
635 | 725 | ||
636 | if (cpi->incoming_bucket_counter + num_buckets > (1 << cpi->ibf_order)) | 726 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order)); |
637 | { | ||
638 | /* TODO: handle this */ | ||
639 | GNUNET_assert (0); | ||
640 | } | ||
641 | 727 | ||
642 | if (NULL == cpi->incoming_ibf) | 728 | if (NULL == cpi->ibf) |
643 | cpi->incoming_ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | 729 | cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); |
644 | 730 | ||
645 | hash_src = (struct GNUNET_HashCode *) &digest[1]; | 731 | key_src = (uint64_t *) &digest[1]; |
646 | 732 | ||
647 | memcpy (cpi->incoming_ibf->hash_sum, hash_src, num_buckets * sizeof *hash_src); | 733 | memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src); |
648 | hash_src += num_buckets; | 734 | hash_src += num_buckets; |
649 | 735 | ||
650 | memcpy (cpi->incoming_ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); | 736 | hash_src = (uint32_t *) key_src; |
737 | |||
738 | memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); | ||
651 | hash_src += num_buckets; | 739 | hash_src += num_buckets; |
652 | 740 | ||
653 | count_src = (uint8_t *) hash_src; | 741 | count_src = (uint8_t *) hash_src; |
654 | 742 | ||
655 | memcpy (cpi->incoming_ibf->count, count_src, num_buckets * sizeof *count_src); | 743 | memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src); |
656 | 744 | ||
657 | cpi->incoming_bucket_counter += num_buckets; | 745 | cpi->ibf_bucket_counter += num_buckets; |
658 | 746 | ||
659 | if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) | 747 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) |
660 | { | 748 | { |
661 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); | 749 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); |
662 | if ((NULL == cpi->wh) && (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order))) | 750 | GNUNET_assert (NULL != cpi->wh); |
663 | write_values (cpi, GNUNET_STREAM_OK, 0); | 751 | cpi->ibf_state = IBF_STATE_DECODING; |
752 | prepare_ibf (cpi); | ||
753 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); | ||
754 | write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0); | ||
664 | } | 755 | } |
665 | return GNUNET_YES; | 756 | return GNUNET_YES; |
666 | } | 757 | } |
@@ -702,10 +793,28 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
702 | } | 793 | } |
703 | 794 | ||
704 | 795 | ||
796 | /** | ||
797 | * Handle a request for elements. | ||
798 | * Only allowed in exchange-rounds. | ||
799 | * | ||
800 | * FIXME: implement | ||
801 | */ | ||
802 | static int | ||
803 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) | ||
804 | { | ||
805 | /* FIXME: implement */ | ||
806 | return GNUNET_YES; | ||
807 | } | ||
808 | |||
809 | |||
810 | /** | ||
811 | * Handle a HELLO-message, send when another peer wants to join a session where | ||
812 | * our peer is a member. The session may or may not be inhabited yet. | ||
813 | */ | ||
705 | static int | 814 | static int |
706 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) | 815 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) |
707 | { | 816 | { |
708 | /* FIXME: session might not exist yet */ | 817 | /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ |
709 | struct ConsensusSession *session; | 818 | struct ConsensusSession *session; |
710 | session = sessions_head; | 819 | session = sessions_head; |
711 | while (NULL != session) | 820 | while (NULL != session) |
@@ -726,7 +835,8 @@ handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello | |||
726 | } | 835 | } |
727 | session = session->next; | 836 | session = session->next; |
728 | } | 837 | } |
729 | GNUNET_assert (0); | 838 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n"); |
839 | GNUNET_break (0); | ||
730 | return GNUNET_NO; | 840 | return GNUNET_NO; |
731 | } | 841 | } |
732 | 842 | ||
@@ -756,6 +866,8 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader | |||
756 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); | 866 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); |
757 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: | 867 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: |
758 | return handle_p2p_element (cpi, message); | 868 | return handle_p2p_element (cpi, message); |
869 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: | ||
870 | return handle_p2p_element_request (cpi, (struct ElementRequest *) message); | ||
759 | default: | 871 | default: |
760 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); | 872 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); |
761 | /* FIXME: handle correctly */ | 873 | /* FIXME: handle correctly */ |
@@ -1007,6 +1119,9 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess | |||
1007 | 1119 | ||
1008 | 1120 | ||
1009 | 1121 | ||
1122 | /** | ||
1123 | * Called when stream has finishes writing the hello message | ||
1124 | */ | ||
1010 | static void | 1125 | static void |
1011 | hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1126 | hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1012 | { | 1127 | { |
@@ -1067,6 +1182,8 @@ initialize_session_info (struct ConsensusSession *session) | |||
1067 | session->info[i].session = session; | 1182 | session->info[i].session = session; |
1068 | } | 1183 | } |
1069 | 1184 | ||
1185 | session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; | ||
1186 | |||
1070 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | 1187 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
1071 | i = (session->local_peer_idx + 1) % session->num_peers; | 1188 | i = (session->local_peer_idx + 1) % session->num_peers; |
1072 | while (i != last) | 1189 | while (i != last) |
@@ -1143,8 +1260,9 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke | |||
1143 | int i; | 1260 | int i; |
1144 | v = key->bits[0]; | 1261 | v = key->bits[0]; |
1145 | /* count trailing '1'-bits of v */ | 1262 | /* count trailing '1'-bits of v */ |
1146 | for (i = 0; v & 1; v>>=1, i++); | 1263 | for (i = 0; v & 1; v>>=1, i++) |
1147 | ibf_insert (strata[i], key); | 1264 | /* empty */; |
1265 | ibf_insert (strata[i], ibf_key_from_hashcode (key)); | ||
1148 | } | 1266 | } |
1149 | 1267 | ||
1150 | 1268 | ||
@@ -1309,6 +1427,29 @@ client_insert (void *cls, | |||
1309 | 1427 | ||
1310 | 1428 | ||
1311 | 1429 | ||
1430 | |||
1431 | /** | ||
1432 | * Functions of this signature are called whenever writing operations | ||
1433 | * on a stream are executed | ||
1434 | * | ||
1435 | * @param cls the closure from GNUNET_STREAM_write | ||
1436 | * @param status the status of the stream at the time this function is called; | ||
1437 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
1438 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
1439 | * (this doesn't mean that the data is never sent, the receiver may | ||
1440 | * have read the data but its ACKs may have been lost); | ||
1441 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
1442 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
1443 | * be processed. | ||
1444 | * @param size the number of bytes written | ||
1445 | */ | ||
1446 | static void | ||
1447 | write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
1448 | { | ||
1449 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1450 | /* just wait for the ibf */ | ||
1451 | } | ||
1452 | |||
1312 | /** | 1453 | /** |
1313 | * Functions of this signature are called whenever writing operations | 1454 | * Functions of this signature are called whenever writing operations |
1314 | * on a stream are executed | 1455 | * on a stream are executed |
@@ -1331,65 +1472,53 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1331 | struct StrataMessage *strata_msg; | 1472 | struct StrataMessage *strata_msg; |
1332 | size_t msize; | 1473 | size_t msize; |
1333 | int i; | 1474 | int i; |
1334 | struct GNUNET_HashCode *hash_dst; | 1475 | uint64_t *key_dst; |
1476 | uint32_t *hash_dst; | ||
1335 | uint8_t *count_dst; | 1477 | uint8_t *count_dst; |
1336 | int num_strata; | ||
1337 | 1478 | ||
1338 | cpi = cls; | 1479 | cpi = cls; |
1339 | cpi->wh = NULL; | 1480 | cpi->wh = NULL; |
1340 | 1481 | ||
1482 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1483 | |||
1341 | GNUNET_assert (GNUNET_YES == cpi->is_outgoing); | 1484 | GNUNET_assert (GNUNET_YES == cpi->is_outgoing); |
1342 | 1485 | ||
1343 | /* FIXME: handle this */ | 1486 | /* FIXME: handle this */ |
1344 | GNUNET_assert (GNUNET_STREAM_OK == status); | 1487 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1345 | 1488 | ||
1346 | if (STRATA_COUNT == cpi->strata_counter) | 1489 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); |
1347 | { | ||
1348 | /* strata have been written, wait for other side's IBF */ | ||
1349 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n"); | ||
1350 | return; | ||
1351 | } | ||
1352 | |||
1353 | if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE) | ||
1354 | num_strata = (STRATA_COUNT - cpi->strata_counter); | ||
1355 | else | ||
1356 | num_strata = STRATA_PER_MESSAGE; | ||
1357 | |||
1358 | |||
1359 | msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); | ||
1360 | 1490 | ||
1361 | strata_msg = GNUNET_malloc (msize); | 1491 | strata_msg = GNUNET_malloc (msize); |
1362 | strata_msg->header.size = htons (msize); | 1492 | strata_msg->header.size = htons (msize); |
1363 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | 1493 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); |
1364 | strata_msg->num_strata = htons (num_strata); | ||
1365 | 1494 | ||
1366 | /* for correct message alignment, copy bucket types seperately */ | 1495 | /* for correct message alignment, copy bucket types seperately */ |
1367 | hash_dst = (struct GNUNET_HashCode *) &strata_msg[1]; | 1496 | key_dst = (uint64_t *) &strata_msg[1]; |
1368 | 1497 | ||
1369 | for (i = 0; i < num_strata; i++) | 1498 | for (i = 0; i < STRATA_COUNT; i++) |
1370 | { | 1499 | { |
1371 | memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); | 1500 | memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst); |
1372 | hash_dst += STRATA_IBF_BUCKETS; | 1501 | key_dst += STRATA_IBF_BUCKETS; |
1373 | } | 1502 | } |
1374 | 1503 | ||
1375 | for (i = 0; i < num_strata; i++) | 1504 | hash_dst = (uint32_t *) key_dst; |
1505 | |||
1506 | for (i = 0; i < STRATA_COUNT; i++) | ||
1376 | { | 1507 | { |
1377 | memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); | 1508 | memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); |
1378 | hash_dst += STRATA_IBF_BUCKETS; | 1509 | hash_dst += STRATA_IBF_BUCKETS; |
1379 | } | 1510 | } |
1380 | 1511 | ||
1381 | count_dst = (uint8_t *) hash_dst; | 1512 | count_dst = (uint8_t *) hash_dst; |
1382 | 1513 | ||
1383 | for (i = 0; i < num_strata; i++) | 1514 | for (i = 0; i < STRATA_COUNT; i++) |
1384 | { | 1515 | { |
1385 | memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, STRATA_IBF_BUCKETS); | 1516 | memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS); |
1386 | count_dst += STRATA_IBF_BUCKETS; | 1517 | count_dst += STRATA_IBF_BUCKETS; |
1387 | } | 1518 | } |
1388 | 1519 | ||
1389 | cpi->strata_counter += num_strata; | ||
1390 | |||
1391 | cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | 1520 | cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, |
1392 | write_strata, cpi); | 1521 | write_strata_done, cpi); |
1393 | 1522 | ||
1394 | GNUNET_assert (NULL != cpi->wh); | 1523 | GNUNET_assert (NULL != cpi->wh); |
1395 | } | 1524 | } |
@@ -1416,29 +1545,33 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1416 | struct ConsensusPeerInformation *cpi; | 1545 | struct ConsensusPeerInformation *cpi; |
1417 | struct DifferenceDigest *digest; | 1546 | struct DifferenceDigest *digest; |
1418 | int msize; | 1547 | int msize; |
1419 | struct GNUNET_HashCode *hash_dst; | 1548 | uint64_t *key_dst; |
1549 | uint32_t *hash_dst; | ||
1420 | uint8_t *count_dst; | 1550 | uint8_t *count_dst; |
1421 | int num_buckets; | 1551 | int num_buckets; |
1422 | 1552 | ||
1423 | cpi = cls; | 1553 | cpi = cls; |
1424 | cpi->wh = NULL; | 1554 | cpi->wh = NULL; |
1425 | 1555 | ||
1426 | if (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order)) | 1556 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1557 | |||
1558 | GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state); | ||
1559 | |||
1560 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | ||
1427 | { | 1561 | { |
1428 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); | 1562 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); |
1429 | if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) | 1563 | /* we now wait for values / requests / another IBF because peer could not decode with our IBF */ |
1430 | write_values (cpi, GNUNET_STREAM_OK, 0); | ||
1431 | return; | 1564 | return; |
1432 | } | 1565 | } |
1433 | 1566 | ||
1434 | /* remaining buckets */ | 1567 | /* remaining buckets */ |
1435 | num_buckets = (1 << cpi->ibf_order) - cpi->outgoing_bucket_counter; | 1568 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; |
1436 | 1569 | ||
1437 | /* limit to maximum */ | 1570 | /* limit to maximum */ |
1438 | if (num_buckets > BUCKETS_PER_MESSAGE) | 1571 | if (num_buckets > BUCKETS_PER_MESSAGE) |
1439 | num_buckets = BUCKETS_PER_MESSAGE; | 1572 | num_buckets = BUCKETS_PER_MESSAGE; |
1440 | 1573 | ||
1441 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->outgoing_bucket_counter, (1<<cpi->ibf_order)); | 1574 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<<cpi->ibf_order)); |
1442 | 1575 | ||
1443 | msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); | 1576 | msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); |
1444 | 1577 | ||
@@ -1447,19 +1580,21 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1447 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | 1580 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); |
1448 | digest->order = cpi->ibf_order; | 1581 | digest->order = cpi->ibf_order; |
1449 | 1582 | ||
1450 | hash_dst = (struct GNUNET_HashCode *) &digest[1]; | 1583 | key_dst = (uint64_t *) &digest[1]; |
1451 | 1584 | ||
1452 | memcpy (hash_dst, cpi->outgoing_ibf->hash_sum, num_buckets * sizeof *hash_dst); | 1585 | memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst); |
1453 | hash_dst += num_buckets; | 1586 | key_dst += num_buckets; |
1587 | |||
1588 | hash_dst = (uint32_t *) key_dst; | ||
1454 | 1589 | ||
1455 | memcpy (hash_dst, cpi->outgoing_ibf->id_sum, num_buckets * sizeof *hash_dst); | 1590 | memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst); |
1456 | hash_dst += num_buckets; | 1591 | hash_dst += num_buckets; |
1457 | 1592 | ||
1458 | count_dst = (uint8_t *) hash_dst; | 1593 | count_dst = (uint8_t *) hash_dst; |
1459 | 1594 | ||
1460 | memcpy (count_dst, cpi->outgoing_ibf->count, num_buckets * sizeof *count_dst); | 1595 | memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst); |
1461 | 1596 | ||
1462 | cpi->outgoing_bucket_counter += num_buckets; | 1597 | cpi->ibf_bucket_counter += num_buckets; |
1463 | 1598 | ||
1464 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, | 1599 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, |
1465 | write_ibf, cpi); | 1600 | write_ibf, cpi); |
@@ -1484,37 +1619,34 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1484 | * @param size the number of bytes written | 1619 | * @param size the number of bytes written |
1485 | */ | 1620 | */ |
1486 | static void | 1621 | static void |
1487 | write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1622 | write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1488 | { | 1623 | { |
1489 | struct ConsensusPeerInformation *cpi; | 1624 | struct ConsensusPeerInformation *cpi; |
1490 | struct GNUNET_HashCode key; | 1625 | uint64_t key; |
1491 | struct GNUNET_CONSENSUS_Element *element; | 1626 | struct GNUNET_HashCode hashcode; |
1492 | struct GNUNET_MessageHeader *element_msg; | ||
1493 | int side; | 1627 | int side; |
1494 | int msize; | 1628 | int msize; |
1495 | 1629 | ||
1630 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1631 | |||
1496 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); | 1632 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); |
1497 | 1633 | ||
1498 | cpi = cls; | 1634 | cpi = cls; |
1499 | cpi->wh = NULL; | 1635 | cpi->wh = NULL; |
1500 | 1636 | ||
1501 | if (NULL == cpi->diff_ibf) | 1637 | GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); |
1502 | { | ||
1503 | GNUNET_assert (NULL != cpi->incoming_ibf); | ||
1504 | GNUNET_assert (NULL != cpi->outgoing_ibf); | ||
1505 | GNUNET_assert (cpi->outgoing_ibf->size == cpi->incoming_ibf->size); | ||
1506 | cpi->diff_ibf = ibf_dup (cpi->incoming_ibf); | ||
1507 | ibf_subtract (cpi->diff_ibf, cpi->outgoing_ibf); | ||
1508 | } | ||
1509 | 1638 | ||
1510 | for (;;) | 1639 | for (;;) |
1511 | { | 1640 | { |
1512 | int res; | 1641 | int res; |
1513 | res = ibf_decode (cpi->diff_ibf, &side, &key); | 1642 | res = ibf_decode (cpi->ibf, &side, &key); |
1514 | if (GNUNET_SYSERR == res) | 1643 | if (GNUNET_SYSERR == res) |
1515 | { | 1644 | { |
1516 | /* TODO: handle this correctly, request new ibf */ | 1645 | cpi->ibf_order++; |
1517 | GNUNET_break (0); | 1646 | prepare_ibf (cpi); |
1647 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1648 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | ||
1649 | write_ibf (cls, status, size); | ||
1518 | return; | 1650 | return; |
1519 | } | 1651 | } |
1520 | if (GNUNET_NO == res) | 1652 | if (GNUNET_NO == res) |
@@ -1523,40 +1655,59 @@ write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1523 | return; | 1655 | return; |
1524 | } | 1656 | } |
1525 | if (-1 == side) | 1657 | if (-1 == side) |
1526 | break; | 1658 | { |
1527 | } | 1659 | struct GNUNET_CONSENSUS_Element *element; |
1528 | 1660 | struct GNUNET_MessageHeader *element_msg; | |
1529 | element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &key); | 1661 | ibf_hashcode_from_key (key, &hashcode); |
1530 | 1662 | /* FIXME: this only transmits one element stored with the key */ | |
1531 | if (NULL == element) | 1663 | element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); |
1532 | { | 1664 | if (NULL == element) |
1533 | /* FIXME: handle correctly */ | 1665 | continue; |
1534 | GNUNET_break (0); | 1666 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; |
1535 | return; | 1667 | element_msg = GNUNET_malloc (msize); |
1668 | element_msg->size = htons (msize); | ||
1669 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | ||
1670 | GNUNET_assert (NULL != element->data); | ||
1671 | memcpy (&element_msg[1], element->data, element->size); | ||
1672 | cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1673 | write_requests_and_elements, cpi); | ||
1674 | GNUNET_free (element_msg); | ||
1675 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); | ||
1676 | |||
1677 | GNUNET_assert (NULL != cpi->wh); | ||
1678 | return; | ||
1679 | } | ||
1680 | else | ||
1681 | { | ||
1682 | struct ElementRequest *msg; | ||
1683 | size_t msize; | ||
1684 | uint64_t *p; | ||
1685 | |||
1686 | msize = (sizeof *msg) + sizeof (uint64_t); | ||
1687 | msg = GNUNET_malloc (msize); | ||
1688 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1689 | msg->header.size = htons (msize); | ||
1690 | p = (uint64_t *) &msg[1]; | ||
1691 | *p = key; | ||
1692 | |||
1693 | cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1694 | write_requests_and_elements, cpi); | ||
1695 | GNUNET_assert (NULL != cpi->wh); | ||
1696 | GNUNET_free (msg); | ||
1697 | return; | ||
1698 | } | ||
1536 | } | 1699 | } |
1537 | 1700 | ||
1538 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; | 1701 | } |
1539 | |||
1540 | element_msg = GNUNET_malloc (msize); | ||
1541 | element_msg->size = htons (msize); | ||
1542 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | ||
1543 | |||
1544 | |||
1545 | GNUNET_assert (NULL != element->data); | ||
1546 | |||
1547 | |||
1548 | memcpy (&element_msg[1], element->data, element->size); | ||
1549 | |||
1550 | cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1551 | write_values, cpi); | ||
1552 | |||
1553 | GNUNET_free (element_msg); | ||
1554 | 1702 | ||
1555 | 1703 | ||
1556 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); | ||
1557 | 1704 | ||
1558 | GNUNET_assert (NULL != cpi->wh); | 1705 | /* |
1706 | static void | ||
1707 | select_best_group (struct ConsensusSession *session) | ||
1708 | { | ||
1559 | } | 1709 | } |
1710 | */ | ||
1560 | 1711 | ||
1561 | 1712 | ||
1562 | /** | 1713 | /** |
@@ -1566,7 +1717,7 @@ write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1566 | * @param client client handle | 1717 | * @param client client handle |
1567 | * @param message message sent by the client | 1718 | * @param message message sent by the client |
1568 | */ | 1719 | */ |
1569 | void | 1720 | static void |
1570 | client_conclude (void *cls, | 1721 | client_conclude (void *cls, |
1571 | struct GNUNET_SERVER_Client *client, | 1722 | struct GNUNET_SERVER_Client *client, |
1572 | const struct GNUNET_MessageHeader *message) | 1723 | const struct GNUNET_MessageHeader *message) |
@@ -1597,8 +1748,6 @@ client_conclude (void *cls, | |||
1597 | 1748 | ||
1598 | session->conclude_requested = GNUNET_YES; | 1749 | session->conclude_requested = GNUNET_YES; |
1599 | 1750 | ||
1600 | /* FIXME: write to already connected sockets */ | ||
1601 | |||
1602 | for (i = 0; i < session->num_peers; i++) | 1751 | for (i = 0; i < session->num_peers; i++) |
1603 | { | 1752 | { |
1604 | if ( (GNUNET_YES == session->info[i].is_outgoing) && | 1753 | if ( (GNUNET_YES == session->info[i].is_outgoing) && |
@@ -1654,19 +1803,14 @@ client_ack (void *cls, | |||
1654 | 1803 | ||
1655 | if (msg->keep) | 1804 | if (msg->keep) |
1656 | { | 1805 | { |
1657 | |||
1658 | element = pending->element; | 1806 | element = pending->element; |
1659 | |||
1660 | GNUNET_CRYPTO_hash (element, element->size, &key); | 1807 | GNUNET_CRYPTO_hash (element, element->size, &key); |
1661 | 1808 | ||
1662 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, | 1809 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, |
1663 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 1810 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1664 | |||
1665 | strata_insert (session->strata, &key); | 1811 | strata_insert (session->strata, &key); |
1666 | } | 1812 | } |
1667 | 1813 | ||
1668 | /* FIXME: also remove element from strata */ | ||
1669 | |||
1670 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1814 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1671 | } | 1815 | } |
1672 | 1816 | ||
@@ -1783,6 +1927,7 @@ shutdown_task (void *cls, | |||
1783 | static void | 1927 | static void |
1784 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) | 1928 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) |
1785 | { | 1929 | { |
1930 | /* core is only used to retrieve the peer identity */ | ||
1786 | static const struct GNUNET_CORE_MessageHandler core_handlers[] = { | 1931 | static const struct GNUNET_CORE_MessageHandler core_handlers[] = { |
1787 | {NULL, 0, 0} | 1932 | {NULL, 0, 0} |
1788 | }; | 1933 | }; |
@@ -1803,7 +1948,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
1803 | 1948 | ||
1804 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | 1949 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); |
1805 | 1950 | ||
1806 | |||
1807 | listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, | 1951 | listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, |
1808 | listen_cb, NULL, | 1952 | listen_cb, NULL, |
1809 | GNUNET_STREAM_OPTION_END); | 1953 | GNUNET_STREAM_OPTION_END); |
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index 629fde3fc..fb3bbe7cd 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c | |||
@@ -30,6 +30,37 @@ | |||
30 | 30 | ||
31 | 31 | ||
32 | /** | 32 | /** |
33 | * Create a key from a hashcode. | ||
34 | * | ||
35 | * @param hash the hashcode | ||
36 | * @return a key | ||
37 | */ | ||
38 | uint64_t | ||
39 | ibf_key_from_hashcode (const struct GNUNET_HashCode *hash) | ||
40 | { | ||
41 | return GNUNET_ntohll (*(uint64_t *) hash); | ||
42 | } | ||
43 | |||
44 | |||
45 | /** | ||
46 | * Create a hashcode from a key, by replicating the key | ||
47 | * until the hascode is filled | ||
48 | * | ||
49 | * @param key the key | ||
50 | * @param dst hashcode to store the result in | ||
51 | */ | ||
52 | void | ||
53 | ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst) | ||
54 | { | ||
55 | uint64_t *p; | ||
56 | int i; | ||
57 | p = (uint64_t *) dst; | ||
58 | for (i = 0; i < 8; i++) | ||
59 | *p++ = key; | ||
60 | } | ||
61 | |||
62 | |||
63 | /** | ||
33 | * Create an invertible bloom filter. | 64 | * Create an invertible bloom filter. |
34 | * | 65 | * |
35 | * @param size number of IBF buckets | 66 | * @param size number of IBF buckets |
@@ -39,7 +70,7 @@ | |||
39 | * @return the newly created invertible bloom filter | 70 | * @return the newly created invertible bloom filter |
40 | */ | 71 | */ |
41 | struct InvertibleBloomFilter * | 72 | struct InvertibleBloomFilter * |
42 | ibf_create (unsigned int size, unsigned int hash_num, uint32_t salt) | 73 | ibf_create (uint32_t size, unsigned int hash_num, uint32_t salt) |
43 | { | 74 | { |
44 | struct InvertibleBloomFilter *ibf; | 75 | struct InvertibleBloomFilter *ibf; |
45 | 76 | ||
@@ -53,72 +84,53 @@ ibf_create (unsigned int size, unsigned int hash_num, uint32_t salt) | |||
53 | return ibf; | 84 | return ibf; |
54 | } | 85 | } |
55 | 86 | ||
56 | |||
57 | |||
58 | /** | 87 | /** |
59 | * Insert an element into an IBF, with either positive or negative sign. | 88 | * Store unique bucket indices for the specified key in dst. |
60 | * | ||
61 | * @param ibf the IBF | ||
62 | * @param id the element's hash code | ||
63 | * @param side the sign of side determines the sign of the | ||
64 | * inserted element. | ||
65 | */ | 89 | */ |
66 | void | 90 | static inline void |
67 | ibf_insert_on_side (struct InvertibleBloomFilter *ibf, | 91 | ibf_get_indices (const struct InvertibleBloomFilter *ibf, |
68 | const struct GNUNET_HashCode *key, | 92 | uint64_t key, int *dst) |
69 | int side) | ||
70 | { | 93 | { |
71 | struct GNUNET_HashCode bucket_indices; | 94 | struct GNUNET_HashCode bucket_indices; |
72 | struct GNUNET_HashCode key_copy; | 95 | unsigned int filled = 0; |
73 | struct GNUNET_HashCode key_hash; | 96 | int i; |
74 | unsigned int i; | 97 | GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices); |
98 | for (i = 0; filled < ibf->hash_num; i++) | ||
99 | { | ||
100 | unsigned int bucket; | ||
101 | unsigned int j; | ||
102 | if ( (0 != i) && (0 == (i % 16)) ) | ||
103 | GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices); | ||
104 | bucket = bucket_indices.bits[i] % ibf->size; | ||
105 | for (j = 0; j < filled; j++) | ||
106 | if (dst[j] == bucket) | ||
107 | goto try_next;; | ||
108 | dst[filled++] = bucket; | ||
109 | try_next: ; | ||
110 | } | ||
111 | } | ||
75 | 112 | ||
76 | 113 | ||
77 | GNUNET_assert ((1 == side) || (-1 == side)); | 114 | static void |
78 | GNUNET_assert (NULL != ibf); | 115 | ibf_insert_into (struct InvertibleBloomFilter *ibf, |
79 | 116 | uint64_t key, | |
117 | const int *buckets, int side) | ||
118 | { | ||
119 | int i; | ||
120 | struct GNUNET_HashCode key_hash_sha; | ||
121 | uint32_t key_hash; | ||
122 | GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha); | ||
123 | key_hash = key_hash_sha.bits[0]; | ||
124 | for (i = 0; i < ibf->hash_num; i++) | ||
80 | { | 125 | { |
81 | int used_buckets[ibf->hash_num]; | 126 | const int bucket = buckets[i]; |
82 | 127 | ibf->count[bucket] += side; | |
83 | /* copy the key, if key and an entry in the IBF alias */ | 128 | ibf->id_sum[bucket] ^= key; |
84 | key_copy = *key; | 129 | ibf->hash_sum[bucket] ^= key_hash; |
85 | |||
86 | bucket_indices = key_copy; | ||
87 | GNUNET_CRYPTO_hash (key, sizeof (struct GNUNET_HashCode), &key_hash); | ||
88 | |||
89 | for (i = 0; i < ibf->hash_num; i++) | ||
90 | { | ||
91 | unsigned int bucket; | ||
92 | unsigned int j; | ||
93 | int collided; | ||
94 | |||
95 | if ( (0 != i) && | ||
96 | (0 == (i % 16)) ) | ||
97 | GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), | ||
98 | &bucket_indices); | ||
99 | |||
100 | bucket = bucket_indices.bits[i%16] % ibf->size; | ||
101 | collided = GNUNET_NO; | ||
102 | for (j = 0; j < i; j++) | ||
103 | if (used_buckets[j] == bucket) | ||
104 | collided = GNUNET_YES; | ||
105 | if (GNUNET_YES == collided) | ||
106 | { | ||
107 | used_buckets[i] = -1; | ||
108 | continue; | ||
109 | } | ||
110 | used_buckets[i] = bucket; | ||
111 | |||
112 | ibf->count[bucket] += side; | ||
113 | |||
114 | GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket], | ||
115 | &ibf->id_sum[bucket]); | ||
116 | GNUNET_CRYPTO_hash_xor (&key_hash, &ibf->hash_sum[bucket], | ||
117 | &ibf->hash_sum[bucket]); | ||
118 | } | ||
119 | } | 130 | } |
120 | } | 131 | } |
121 | 132 | ||
133 | |||
122 | /** | 134 | /** |
123 | * Insert an element into an IBF. | 135 | * Insert an element into an IBF. |
124 | * | 136 | * |
@@ -126,27 +138,28 @@ ibf_insert_on_side (struct InvertibleBloomFilter *ibf, | |||
126 | * @param id the element's hash code | 138 | * @param id the element's hash code |
127 | */ | 139 | */ |
128 | void | 140 | void |
129 | ibf_insert (struct InvertibleBloomFilter *ibf, const struct GNUNET_HashCode *key) | 141 | ibf_insert (struct InvertibleBloomFilter *ibf, uint64_t key) |
130 | { | 142 | { |
131 | ibf_insert_on_side (ibf, key, 1); | 143 | int buckets[ibf->hash_num]; |
144 | ibf_get_indices (ibf, key, buckets); | ||
145 | ibf_insert_into (ibf, key, buckets, 1); | ||
132 | } | 146 | } |
133 | 147 | ||
148 | /** | ||
149 | * Test is the IBF is empty, i.e. all counts, keys and key hashes are zero. | ||
150 | */ | ||
134 | static int | 151 | static int |
135 | ibf_is_empty (struct InvertibleBloomFilter *ibf) | 152 | ibf_is_empty (struct InvertibleBloomFilter *ibf) |
136 | { | 153 | { |
137 | int i; | 154 | int i; |
138 | for (i = 0; i < ibf->size; i++) | 155 | for (i = 0; i < ibf->size; i++) |
139 | { | 156 | { |
140 | int j; | ||
141 | if (0 != ibf->count[i]) | 157 | if (0 != ibf->count[i]) |
142 | return GNUNET_NO; | 158 | return GNUNET_NO; |
143 | for (j = 0; j < 16; ++j) | 159 | if (0 != ibf->hash_sum[i]) |
144 | { | 160 | return GNUNET_NO; |
145 | if (0 != ibf->hash_sum[i].bits[j]) | 161 | if (0 != ibf->id_sum[i]) |
146 | return GNUNET_NO; | 162 | return GNUNET_NO; |
147 | if (0 != ibf->id_sum[i].bits[j]) | ||
148 | return GNUNET_NO; | ||
149 | } | ||
150 | } | 163 | } |
151 | return GNUNET_YES; | 164 | return GNUNET_YES; |
152 | } | 165 | } |
@@ -166,21 +179,40 @@ ibf_is_empty (struct InvertibleBloomFilter *ibf) | |||
166 | */ | 179 | */ |
167 | int | 180 | int |
168 | ibf_decode (struct InvertibleBloomFilter *ibf, | 181 | ibf_decode (struct InvertibleBloomFilter *ibf, |
169 | int *ret_side, struct GNUNET_HashCode *ret_id) | 182 | int *ret_side, uint64_t *ret_id) |
170 | { | 183 | { |
171 | struct GNUNET_HashCode hash; | 184 | uint32_t hash; |
172 | int i; | 185 | int i; |
186 | struct GNUNET_HashCode key_hash_sha; | ||
187 | int buckets[ibf->hash_num]; | ||
173 | 188 | ||
174 | GNUNET_assert (NULL != ibf); | 189 | GNUNET_assert (NULL != ibf); |
175 | 190 | ||
176 | for (i = 0; i < ibf->size; i++) | 191 | for (i = 0; i < ibf->size; i++) |
177 | { | 192 | { |
193 | int j; | ||
194 | int hit; | ||
195 | |||
196 | /* we can only decode from pure buckets */ | ||
178 | if ((1 != ibf->count[i]) && (-1 != ibf->count[i])) | 197 | if ((1 != ibf->count[i]) && (-1 != ibf->count[i])) |
179 | continue; | 198 | continue; |
180 | 199 | ||
181 | GNUNET_CRYPTO_hash (&ibf->id_sum[i], sizeof (struct GNUNET_HashCode), &hash); | 200 | GNUNET_CRYPTO_hash (&ibf->id_sum[i], sizeof (uint64_t), &key_hash_sha); |
201 | hash = key_hash_sha.bits[0]; | ||
182 | 202 | ||
183 | if (0 != memcmp (&hash, &ibf->hash_sum[i], sizeof (struct GNUNET_HashCode))) | 203 | /* test if the hash matches the key */ |
204 | if (hash != ibf->hash_sum[i]) | ||
205 | continue; | ||
206 | |||
207 | /* test if key in bucket hits its own location, | ||
208 | * if not, the key hash was subject to collision */ | ||
209 | hit = GNUNET_NO; | ||
210 | ibf_get_indices (ibf, ibf->id_sum[i], buckets); | ||
211 | for (j = 0; j < ibf->hash_num; j++) | ||
212 | if (buckets[j] == i) | ||
213 | hit = GNUNET_YES; | ||
214 | |||
215 | if (GNUNET_NO == hit) | ||
184 | continue; | 216 | continue; |
185 | 217 | ||
186 | if (NULL != ret_side) | 218 | if (NULL != ret_side) |
@@ -189,7 +221,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
189 | *ret_id = ibf->id_sum[i]; | 221 | *ret_id = ibf->id_sum[i]; |
190 | 222 | ||
191 | /* insert on the opposite side, effectively removing the element */ | 223 | /* insert on the opposite side, effectively removing the element */ |
192 | ibf_insert_on_side (ibf, &ibf->id_sum[i], -ibf->count[i]); | 224 | ibf_insert_into (ibf, ibf->id_sum[i], buckets, -ibf->count[i]); |
193 | 225 | ||
194 | return GNUNET_YES; | 226 | return GNUNET_YES; |
195 | } | 227 | } |
@@ -200,7 +232,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
200 | } | 232 | } |
201 | 233 | ||
202 | 234 | ||
203 | |||
204 | /** | 235 | /** |
205 | * Subtract ibf2 from ibf1, storing the result in ibf1. | 236 | * Subtract ibf2 from ibf1, storing the result in ibf1. |
206 | * The two IBF's must have the same parameters size and hash_num. | 237 | * The two IBF's must have the same parameters size and hash_num. |
@@ -209,7 +240,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
209 | * @param ibf2 IBF that will be subtracted from ibf1 | 240 | * @param ibf2 IBF that will be subtracted from ibf1 |
210 | */ | 241 | */ |
211 | void | 242 | void |
212 | ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *ibf2) | 243 | ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2) |
213 | { | 244 | { |
214 | int i; | 245 | int i; |
215 | 246 | ||
@@ -220,10 +251,8 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter * | |||
220 | for (i = 0; i < ibf1->size; i++) | 251 | for (i = 0; i < ibf1->size; i++) |
221 | { | 252 | { |
222 | ibf1->count[i] -= ibf2->count[i]; | 253 | ibf1->count[i] -= ibf2->count[i]; |
223 | GNUNET_CRYPTO_hash_xor (&ibf1->id_sum[i], &ibf2->id_sum[i], | 254 | ibf1->hash_sum[i] ^= ibf2->hash_sum[i]; |
224 | &ibf1->id_sum[i]); | 255 | ibf1->id_sum[i] ^= ibf2->id_sum[i]; |
225 | GNUNET_CRYPTO_hash_xor (&ibf1->hash_sum[i], &ibf2->hash_sum[i], | ||
226 | &ibf1->hash_sum[i]); | ||
227 | } | 256 | } |
228 | } | 257 | } |
229 | 258 | ||
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h index 26f101905..72345d3e1 100644 --- a/src/consensus/ibf.h +++ b/src/consensus/ibf.h | |||
@@ -42,7 +42,7 @@ extern "C" | |||
42 | /** | 42 | /** |
43 | * Size of one ibf bucket in bytes | 43 | * Size of one ibf bucket in bytes |
44 | */ | 44 | */ |
45 | #define IBF_BUCKET_SIZE (64+64+1) | 45 | #define IBF_BUCKET_SIZE (8+4+1) |
46 | 46 | ||
47 | 47 | ||
48 | /** | 48 | /** |
@@ -56,7 +56,7 @@ struct InvertibleBloomFilter | |||
56 | /** | 56 | /** |
57 | * How many cells does this IBF have? | 57 | * How many cells does this IBF have? |
58 | */ | 58 | */ |
59 | unsigned int size; | 59 | uint32_t size; |
60 | 60 | ||
61 | /** | 61 | /** |
62 | * In how many cells do we hash one element? | 62 | * In how many cells do we hash one element? |
@@ -72,12 +72,12 @@ struct InvertibleBloomFilter | |||
72 | /** | 72 | /** |
73 | * xor sums of the elements' hash codes, used to identify the elements. | 73 | * xor sums of the elements' hash codes, used to identify the elements. |
74 | */ | 74 | */ |
75 | struct GNUNET_HashCode *id_sum; | 75 | uint64_t *id_sum; |
76 | 76 | ||
77 | /** | 77 | /** |
78 | * xor sums of the "hash of the hash". | 78 | * xor sums of the "hash of the hash". |
79 | */ | 79 | */ |
80 | struct GNUNET_HashCode *hash_sum; | 80 | uint32_t *hash_sum; |
81 | 81 | ||
82 | /** | 82 | /** |
83 | * How many times has a bucket been hit? | 83 | * How many times has a bucket been hit? |
@@ -88,16 +88,37 @@ struct InvertibleBloomFilter | |||
88 | 88 | ||
89 | 89 | ||
90 | /** | 90 | /** |
91 | * Create a key from a hashcode. | ||
92 | * | ||
93 | * @param hash the hashcode | ||
94 | * @return a key | ||
95 | */ | ||
96 | uint64_t | ||
97 | ibf_key_from_hashcode (const struct GNUNET_HashCode *hash); | ||
98 | |||
99 | |||
100 | /** | ||
101 | * Create a hashcode from a key, by replicating the key | ||
102 | * until the hascode is filled | ||
103 | * | ||
104 | * @param key the key | ||
105 | * @param dst hashcode to store the result in | ||
106 | */ | ||
107 | void | ||
108 | ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst); | ||
109 | |||
110 | |||
111 | /** | ||
91 | * Create an invertible bloom filter. | 112 | * Create an invertible bloom filter. |
92 | * | 113 | * |
93 | * @param size number of IBF buckets | 114 | * @param size number of IBF buckets |
94 | * @param hash_num number of buckets one element is hashed in | 115 | * @param hash_num number of buckets one element is hashed in, usually 3 or 4 |
95 | * @param salt salt for mingling hashes, different salt may | 116 | * @param salt salt for mingling hashes, different salt may |
96 | * result in less (or more) collisions | 117 | * result in less (or more) collisions |
97 | * @return the newly created invertible bloom filter | 118 | * @return the newly created invertible bloom filter |
98 | */ | 119 | */ |
99 | struct InvertibleBloomFilter * | 120 | struct InvertibleBloomFilter * |
100 | ibf_create(unsigned int size, unsigned int hash_num, uint32_t salt); | 121 | ibf_create(uint32_t size, unsigned int hash_num, uint32_t salt); |
101 | 122 | ||
102 | 123 | ||
103 | /** | 124 | /** |
@@ -107,7 +128,7 @@ ibf_create(unsigned int size, unsigned int hash_num, uint32_t salt); | |||
107 | * @param id the element's hash code | 128 | * @param id the element's hash code |
108 | */ | 129 | */ |
109 | void | 130 | void |
110 | ibf_insert (struct InvertibleBloomFilter *ibf, const struct GNUNET_HashCode *id); | 131 | ibf_insert (struct InvertibleBloomFilter *ibf, uint64_t id); |
111 | 132 | ||
112 | 133 | ||
113 | /** | 134 | /** |
@@ -118,7 +139,7 @@ ibf_insert (struct InvertibleBloomFilter *ibf, const struct GNUNET_HashCode *id) | |||
118 | * @param ibf2 IBF that will be subtracted from ibf1 | 139 | * @param ibf2 IBF that will be subtracted from ibf1 |
119 | */ | 140 | */ |
120 | void | 141 | void |
121 | ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *ibf2); | 142 | ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2); |
122 | 143 | ||
123 | 144 | ||
124 | /** | 145 | /** |
@@ -133,7 +154,7 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter * | |||
133 | * GNUNET_SYSERR if the decoding has faile | 154 | * GNUNET_SYSERR if the decoding has faile |
134 | */ | 155 | */ |
135 | int | 156 | int |
136 | ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct GNUNET_HashCode *ret_id); | 157 | ibf_decode (struct InvertibleBloomFilter *ibf, int *side, uint64_t *ret_id); |
137 | 158 | ||
138 | 159 | ||
139 | /** | 160 | /** |