diff options
author | Florian Dold <florian.dold@gmail.com> | 2012-12-05 21:41:09 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2012-12-05 21:41:09 +0000 |
commit | aac85d938153d2f181d4bfd08eb734be980bab43 (patch) | |
tree | 806ee375f14540e08e3b4b71582a13a731d7192a /src/consensus/gnunet-service-consensus.c | |
parent | 612f87ce7ff13706d291c441de26eaf15ded5199 (diff) | |
download | gnunet-aac85d938153d2f181d4bfd08eb734be980bab43.tar.gz gnunet-aac85d938153d2f181d4bfd08eb734be980bab43.zip |
consensus api, consensus service (local), peer driver and ibf sketch
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 322 |
1 files changed, 124 insertions, 198 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index b733a0aec..195efc681 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -20,19 +20,19 @@ | |||
20 | 20 | ||
21 | 21 | ||
22 | #include "platform.h" | 22 | #include "platform.h" |
23 | #include "gnunet_protocols.h" | ||
24 | #include "gnunet_common.h" | 23 | #include "gnunet_common.h" |
25 | #include "gnunet_service_lib.h" | 24 | #include "gnunet_protocols.h" |
25 | #include "gnunet_util_lib.h" | ||
26 | #include "gnunet_consensus_service.h" | 26 | #include "gnunet_consensus_service.h" |
27 | #include "gnunet_core_service.h" | 27 | #include "gnunet_core_service.h" |
28 | #include "gnunet_container_lib.h" | 28 | #include "gnunet_mesh_service.h" |
29 | #include "consensus.h" | 29 | #include "consensus.h" |
30 | 30 | ||
31 | 31 | ||
32 | struct ConsensusClient; | 32 | struct ConsensusSession; |
33 | 33 | ||
34 | static void | 34 | static void |
35 | send_next (struct ConsensusClient *cli); | 35 | send_next (struct ConsensusSession *session); |
36 | 36 | ||
37 | 37 | ||
38 | /** | 38 | /** |
@@ -58,8 +58,7 @@ struct PendingElement | |||
58 | 58 | ||
59 | 59 | ||
60 | /** | 60 | /** |
61 | * A consensus session consists of one or more local clients, | 61 | * A consensus session consists of one local client and the remote authorities. |
62 | * as well as zero or more remote authorities. | ||
63 | */ | 62 | */ |
64 | struct ConsensusSession | 63 | struct ConsensusSession |
65 | { | 64 | { |
@@ -74,18 +73,8 @@ struct ConsensusSession | |||
74 | struct ConsensusSession *prev; | 73 | struct ConsensusSession *prev; |
75 | 74 | ||
76 | /** | 75 | /** |
77 | * Consensus clients are kept in a DLL. | 76 | * Local consensus identification, chosen by clients. |
78 | */ | ||
79 | struct ConsensusClient *clients_head; | ||
80 | |||
81 | /** | ||
82 | * Consensus clients are kept in a DLL. | ||
83 | */ | 77 | */ |
84 | struct ConsensusClient *clients_tail; | ||
85 | |||
86 | /** | ||
87 | * Local consensus identification, chosen by clients. | ||
88 | */ | ||
89 | struct GNUNET_HashCode *local_id; | 78 | struct GNUNET_HashCode *local_id; |
90 | 79 | ||
91 | /** | 80 | /** |
@@ -95,24 +84,6 @@ struct ConsensusSession | |||
95 | struct GNUNET_HashCode *global_id; | 84 | struct GNUNET_HashCode *global_id; |
96 | 85 | ||
97 | /** | 86 | /** |
98 | * Values in the consensus set of this session. | ||
99 | */ | ||
100 | struct GNUNET_CONTAINER_MultiHashMap *values; | ||
101 | }; | ||
102 | |||
103 | |||
104 | struct ConsensusClient | ||
105 | { | ||
106 | /** | ||
107 | * Consensus clients are kept in a DLL. | ||
108 | */ | ||
109 | struct ConsensusClient *next; | ||
110 | /** | ||
111 | * Consensus clients are kept in a DLL. | ||
112 | */ | ||
113 | struct ConsensusClient *prev; | ||
114 | |||
115 | /** | ||
116 | * Corresponding server handle. | 87 | * Corresponding server handle. |
117 | */ | 88 | */ |
118 | struct GNUNET_SERVER_Client *client; | 89 | struct GNUNET_SERVER_Client *client; |
@@ -123,24 +94,30 @@ struct ConsensusClient | |||
123 | int begin; | 94 | int begin; |
124 | 95 | ||
125 | /** | 96 | /** |
126 | * Session this client belongs to | 97 | * Values in the consensus set of this session, |
98 | * all of them either have been sent or approved by the client. | ||
127 | */ | 99 | */ |
128 | struct ConsensusSession *session; | 100 | struct GNUNET_CONTAINER_MultiHashMap *values; |
129 | 101 | ||
130 | /** | 102 | /** |
131 | * Values in the consensus set of this client. | 103 | * Elements that have not been sent to the client yet. |
132 | * Includes pending elements. | ||
133 | */ | 104 | */ |
134 | struct GNUNET_CONTAINER_MultiHashMap *values; | 105 | struct PendingElement *transmit_pending_head; |
135 | 106 | ||
136 | /** | 107 | /** |
137 | * Elements that have not been set to the client yet. | 108 | * Elements that have not been sent to the client yet. |
138 | */ | 109 | */ |
139 | struct PendingElement *pending_head; | 110 | struct PendingElement *transmit_pending_tail; |
111 | |||
140 | /** | 112 | /** |
141 | * Elements that have not been set to the client yet. | 113 | * Elements that have not been sent to the client yet. |
142 | */ | 114 | */ |
143 | struct PendingElement *pending_tail; | 115 | struct PendingElement *approval_pending_head; |
116 | |||
117 | /** | ||
118 | * Elements that have not been sent to the client yet. | ||
119 | */ | ||
120 | struct PendingElement *approval_pending_tail; | ||
144 | 121 | ||
145 | /** | 122 | /** |
146 | * Currently active transmit handle for sending to the client | 123 | * Currently active transmit handle for sending to the client |
@@ -157,6 +134,11 @@ struct ConsensusClient | |||
157 | * Client has been informed about the conclusion. | 134 | * Client has been informed about the conclusion. |
158 | */ | 135 | */ |
159 | int conclude_sent; | 136 | int conclude_sent; |
137 | |||
138 | /** | ||
139 | * Number of other peers in the consensus | ||
140 | */ | ||
141 | int num_peers; | ||
160 | }; | 142 | }; |
161 | 143 | ||
162 | 144 | ||
@@ -185,30 +167,6 @@ static struct GNUNET_SERVER_Handle *srv; | |||
185 | */ | 167 | */ |
186 | static struct GNUNET_PeerIdentity *my_peer; | 168 | static struct GNUNET_PeerIdentity *my_peer; |
187 | 169 | ||
188 | |||
189 | struct ConsensusClient * | ||
190 | find_client (const struct GNUNET_SERVER_Client *srv_client) | ||
191 | { | ||
192 | struct ConsensusSession *session; | ||
193 | struct ConsensusClient *client; | ||
194 | |||
195 | session = sessions_head; | ||
196 | while (NULL != session) | ||
197 | { | ||
198 | client = session->clients_head; | ||
199 | while (NULL != client) | ||
200 | { | ||
201 | if (client->client == srv_client) | ||
202 | { | ||
203 | return client; | ||
204 | } | ||
205 | client = client->next; | ||
206 | } | ||
207 | session = session->next; | ||
208 | } | ||
209 | return NULL; | ||
210 | } | ||
211 | |||
212 | static void | 170 | static void |
213 | disconnect_client (struct GNUNET_SERVER_Client *client) | 171 | disconnect_client (struct GNUNET_SERVER_Client *client) |
214 | { | 172 | { |
@@ -221,73 +179,44 @@ compute_global_id (struct GNUNET_HashCode *dst, | |||
221 | const struct GNUNET_PeerIdentity *peers, | 179 | const struct GNUNET_PeerIdentity *peers, |
222 | int num_peers) | 180 | int num_peers) |
223 | { | 181 | { |
224 | *dst = *local_id; | 182 | int i; |
183 | struct GNUNET_HashCode tmp; | ||
225 | 184 | ||
226 | /* FIXME: hash other peers into global id */ | 185 | *dst = *local_id; |
227 | } | 186 | for (i = 0; i < num_peers; ++i) |
228 | |||
229 | |||
230 | |||
231 | /** | ||
232 | * Iterator over hash map entries. | ||
233 | * | ||
234 | * @param cls closure, the client | ||
235 | * @param key current key code | ||
236 | * @param value value in the hash map | ||
237 | * @return GNUNET_YES if we should continue to | ||
238 | * iterate, | ||
239 | * GNUNET_NO if not. | ||
240 | */ | ||
241 | int | ||
242 | update_pending (void *cls, | ||
243 | const struct GNUNET_HashCode *key, | ||
244 | void *value) | ||
245 | { | ||
246 | struct ConsensusClient *cli; | ||
247 | struct GNUNET_CONSENSUS_Element *element; | ||
248 | struct PendingElement *pending_element; | ||
249 | |||
250 | cli = (struct ConsensusClient *) cls; | ||
251 | element = (struct GNUNET_CONSENSUS_Element *) value; | ||
252 | pending_element = GNUNET_malloc (sizeof (struct PendingElement)); | ||
253 | pending_element->element = element; | ||
254 | |||
255 | if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (cli->values, key)) | ||
256 | { | 187 | { |
257 | GNUNET_CONTAINER_DLL_insert_tail (cli->pending_head, cli->pending_tail, pending_element); | 188 | /* FIXME: maybe hash_xor/hash allow aliased source/target, and we can get by without tmp */ |
258 | GNUNET_CONTAINER_multihashmap_put (cli->values, key, element, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 189 | GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp); |
190 | *dst = tmp; | ||
191 | GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp); | ||
192 | *dst = tmp; | ||
259 | } | 193 | } |
260 | |||
261 | return GNUNET_YES; | ||
262 | } | 194 | } |
263 | 195 | ||
264 | 196 | ||
265 | |||
266 | |||
267 | static size_t | 197 | static size_t |
268 | transmit_pending (void *cls, size_t size, void *buf) | 198 | transmit_pending (void *cls, size_t size, void *buf) |
269 | { | 199 | { |
270 | struct GNUNET_CONSENSUS_Element *element; | 200 | struct GNUNET_CONSENSUS_Element *element; |
271 | struct GNUNET_CONSENSUS_ElementMessage *msg; | 201 | struct GNUNET_CONSENSUS_ElementMessage *msg; |
272 | struct ConsensusClient *cli; | 202 | struct ConsensusSession *session; |
273 | 203 | ||
274 | cli = (struct ConsensusClient *) cls; | 204 | session = (struct ConsensusSession *) cls; |
275 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf; | 205 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf; |
276 | element = cli->pending_head->element; | 206 | element = session->transmit_pending_head->element; |
277 | 207 | ||
278 | GNUNET_assert (NULL != element); | 208 | GNUNET_assert (NULL != element); |
279 | 209 | ||
280 | cli->th = NULL; | 210 | session->th = NULL; |
281 | 211 | ||
282 | msg->element_type = element->type; | 212 | msg->element_type = element->type; |
283 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); | 213 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); |
284 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size); | 214 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size); |
285 | memcpy (&msg[1], element->data, element->size); | 215 | memcpy (&msg[1], element->data, element->size); |
286 | 216 | ||
217 | session->transmit_pending_head = session->transmit_pending_head->next; | ||
287 | 218 | ||
288 | cli->pending_head = cli->pending_head->next; | 219 | send_next (session); |
289 | |||
290 | send_next (cli); | ||
291 | 220 | ||
292 | return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size; | 221 | return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size; |
293 | } | 222 | } |
@@ -299,7 +228,7 @@ transmit_conclude_done (void *cls, size_t size, void *buf) | |||
299 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg; | 228 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg; |
300 | 229 | ||
301 | msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf; | 230 | msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf; |
302 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); | 231 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); |
303 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage)); | 232 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage)); |
304 | msg->num_peers = htons (0); | 233 | msg->num_peers = htons (0); |
305 | 234 | ||
@@ -313,38 +242,43 @@ transmit_conclude_done (void *cls, size_t size, void *buf) | |||
313 | * @param cli the client to send the next message to | 242 | * @param cli the client to send the next message to |
314 | */ | 243 | */ |
315 | static void | 244 | static void |
316 | send_next (struct ConsensusClient *cli) | 245 | send_next (struct ConsensusSession *session) |
317 | { | 246 | { |
318 | int msize; | 247 | int msize; |
319 | 248 | ||
320 | GNUNET_assert (NULL != cli); | 249 | GNUNET_assert (NULL != session); |
321 | 250 | ||
322 | if (NULL != cli->th) | 251 | if (NULL != session->th) |
323 | { | 252 | { |
324 | return; | 253 | return; |
325 | } | 254 | } |
326 | 255 | ||
327 | if ((cli->conclude_requested == GNUNET_YES) && (cli->conclude_sent == GNUNET_NO)) | 256 | if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO)) |
328 | { | 257 | { |
329 | /* just the conclude message with no other authorities in the dummy */ | 258 | /* just the conclude message with no other authorities in the dummy */ |
330 | msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); | 259 | msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); |
331 | cli->th = | 260 | session->th = |
332 | GNUNET_SERVER_notify_transmit_ready (cli->client, msize, | 261 | GNUNET_SERVER_notify_transmit_ready (session->client, msize, |
333 | GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, cli); | 262 | GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, session); |
334 | cli->conclude_sent = GNUNET_YES; | 263 | session->conclude_sent = GNUNET_YES; |
335 | } | 264 | } |
336 | else if (NULL != cli->pending_head) | 265 | else if (NULL != session->transmit_pending_head) |
337 | { | 266 | { |
338 | msize = cli->pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 267 | msize = session->transmit_pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
339 | cli->th = | 268 | session->th = |
340 | GNUNET_SERVER_notify_transmit_ready (cli->client, msize, | 269 | GNUNET_SERVER_notify_transmit_ready (session->client, msize, |
341 | GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, cli); | 270 | GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, session); |
271 | /* TODO: insert into ack pending */ | ||
342 | } | 272 | } |
343 | } | 273 | } |
344 | 274 | ||
345 | 275 | ||
346 | /** | 276 | /** |
347 | * Called when a client wants to join a consensus session. | 277 | * Called when a client wants to join a consensus session. |
278 | * | ||
279 | * @param cls unused | ||
280 | * @param client client that sent the message | ||
281 | * @param m message sent by the client | ||
348 | */ | 282 | */ |
349 | static void | 283 | static void |
350 | client_join (void *cls, | 284 | client_join (void *cls, |
@@ -354,58 +288,42 @@ client_join (void *cls, | |||
354 | struct GNUNET_HashCode global_id; | 288 | struct GNUNET_HashCode global_id; |
355 | const struct GNUNET_CONSENSUS_JoinMessage *msg; | 289 | const struct GNUNET_CONSENSUS_JoinMessage *msg; |
356 | struct ConsensusSession *session; | 290 | struct ConsensusSession *session; |
357 | struct ConsensusClient *consensus_client; | ||
358 | 291 | ||
359 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join\n"); | 292 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n"); |
360 | |||
361 | fprintf(stderr, "foobar\n"); | ||
362 | |||
363 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joined\n"); | ||
364 | 293 | ||
365 | msg = (struct GNUNET_CONSENSUS_JoinMessage *) m; | 294 | msg = (struct GNUNET_CONSENSUS_JoinMessage *) m; |
366 | |||
367 | /* kill the client if it already is in a session */ | ||
368 | if (NULL != find_client (client)) | ||
369 | { | ||
370 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to join twice\n"); | ||
371 | disconnect_client (client); | ||
372 | return; | ||
373 | } | ||
374 | |||
375 | consensus_client = GNUNET_malloc (sizeof (struct ConsensusClient)); | ||
376 | consensus_client->client = client; | ||
377 | consensus_client->begin = GNUNET_NO; | ||
378 | consensus_client->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); | ||
379 | |||
380 | GNUNET_SERVER_client_keep (client); | ||
381 | |||
382 | GNUNET_assert (NULL != consensus_client->values); | ||
383 | 295 | ||
384 | compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers); | 296 | compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers); |
385 | 297 | ||
386 | /* look if we already have a session for this local id */ | ||
387 | session = sessions_head; | 298 | session = sessions_head; |
388 | while (NULL != session) | 299 | while (NULL != session) |
389 | { | 300 | { |
390 | if (0 == memcmp(&global_id, session->global_id, sizeof (struct GNUNET_HashCode))) | 301 | if (client == session->client) |
391 | { | 302 | { |
392 | GNUNET_CONTAINER_DLL_insert (session->clients_head, session->clients_tail, consensus_client); | 303 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n"); |
393 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 304 | disconnect_client (client); |
305 | return; | ||
306 | } | ||
307 | if (0 == memcmp (session->global_id, &global_id, sizeof (struct GNUNET_HashCode))) | ||
308 | { | ||
309 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another client\n"); | ||
310 | disconnect_client (client); | ||
394 | return; | 311 | return; |
395 | } | 312 | } |
396 | session = session->next; | ||
397 | } | 313 | } |
398 | 314 | ||
315 | GNUNET_SERVER_client_keep (client); | ||
316 | |||
399 | /* session does not exist yet, create it */ | 317 | /* session does not exist yet, create it */ |
400 | session = GNUNET_malloc (sizeof (struct ConsensusSession)); | 318 | session = GNUNET_malloc (sizeof (struct ConsensusSession)); |
401 | session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode)); | 319 | session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode)); |
402 | session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode)); | 320 | session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode)); |
403 | session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); | 321 | session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); |
322 | session->client = client; | ||
404 | 323 | ||
405 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | 324 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); |
406 | GNUNET_CONTAINER_DLL_insert (session->clients_head, session->clients_tail, consensus_client); | ||
407 | 325 | ||
408 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session"); | 326 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n"); |
409 | 327 | ||
410 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 328 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
411 | } | 329 | } |
@@ -419,18 +337,22 @@ client_insert (void *cls, | |||
419 | struct GNUNET_SERVER_Client *client, | 337 | struct GNUNET_SERVER_Client *client, |
420 | const struct GNUNET_MessageHeader *m) | 338 | const struct GNUNET_MessageHeader *m) |
421 | { | 339 | { |
422 | struct ConsensusClient *consensus_client; | 340 | struct ConsensusSession *session; |
423 | struct GNUNET_CONSENSUS_ElementMessage *msg; | 341 | struct GNUNET_CONSENSUS_ElementMessage *msg; |
424 | struct GNUNET_CONSENSUS_Element *element; | 342 | struct GNUNET_CONSENSUS_Element *element; |
425 | struct PendingElement *pending_element; | ||
426 | struct GNUNET_HashCode key; | 343 | struct GNUNET_HashCode key; |
427 | int element_size; | 344 | int element_size; |
428 | 345 | ||
429 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n"); | 346 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n"); |
430 | 347 | ||
431 | consensus_client = find_client (client); | 348 | session = sessions_head; |
349 | while (NULL != session) | ||
350 | { | ||
351 | if (session->client == client) | ||
352 | break; | ||
353 | } | ||
432 | 354 | ||
433 | if (NULL == consensus_client) | 355 | if (NULL == session) |
434 | { | 356 | { |
435 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); | 357 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); |
436 | GNUNET_SERVER_client_disconnect (client); | 358 | GNUNET_SERVER_client_disconnect (client); |
@@ -449,28 +371,12 @@ client_insert (void *cls, | |||
449 | 371 | ||
450 | GNUNET_CRYPTO_hash (element, element_size, &key); | 372 | GNUNET_CRYPTO_hash (element, element_size, &key); |
451 | 373 | ||
452 | GNUNET_CONTAINER_multihashmap_put (consensus_client->session->values, &key, element, | 374 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, |
453 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | 375 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
454 | GNUNET_CONTAINER_multihashmap_put (consensus_client->values, &key, element, | ||
455 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
456 | |||
457 | /* send the new value to all clients that don't have it */ | ||
458 | |||
459 | consensus_client = consensus_client->session->clients_head; | ||
460 | while (NULL != consensus_client) | ||
461 | { | ||
462 | if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (consensus_client->values, &key)) | ||
463 | { | ||
464 | pending_element = GNUNET_malloc (sizeof (struct PendingElement)); | ||
465 | pending_element->element = element; | ||
466 | GNUNET_CONTAINER_DLL_insert_tail (consensus_client->pending_head, consensus_client->pending_tail, pending_element); | ||
467 | GNUNET_CONTAINER_multihashmap_put (consensus_client->values, &key, element, | ||
468 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
469 | send_next (consensus_client); | ||
470 | } | ||
471 | } | ||
472 | 376 | ||
473 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 377 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
378 | |||
379 | send_next (session); | ||
474 | } | 380 | } |
475 | 381 | ||
476 | 382 | ||
@@ -482,20 +388,27 @@ client_begin (void *cls, | |||
482 | struct GNUNET_SERVER_Client *client, | 388 | struct GNUNET_SERVER_Client *client, |
483 | const struct GNUNET_MessageHeader *message) | 389 | const struct GNUNET_MessageHeader *message) |
484 | { | 390 | { |
485 | struct ConsensusClient *consensus_client; | 391 | struct ConsensusSession *session; |
486 | 392 | ||
487 | consensus_client = find_client (client); | 393 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client requested begin\n"); |
394 | |||
395 | session = sessions_head; | ||
396 | while (NULL != session) | ||
397 | { | ||
398 | if (session->client == client) | ||
399 | break; | ||
400 | } | ||
488 | 401 | ||
489 | if (NULL == consensus_client) | 402 | if (NULL == session) |
490 | { | 403 | { |
404 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to 'begin', but client is not in any session\n"); | ||
491 | GNUNET_SERVER_client_disconnect (client); | 405 | GNUNET_SERVER_client_disconnect (client); |
492 | return; | 406 | return; |
493 | } | 407 | } |
494 | 408 | ||
495 | consensus_client->begin = GNUNET_YES; | 409 | session->begin = GNUNET_YES; |
496 | 410 | ||
497 | GNUNET_CONTAINER_multihashmap_iterate (consensus_client->session->values, &update_pending, NULL); | 411 | send_next (session); |
498 | send_next (consensus_client); | ||
499 | 412 | ||
500 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 413 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
501 | } | 414 | } |
@@ -510,20 +423,35 @@ client_conclude (void *cls, | |||
510 | struct GNUNET_SERVER_Client *client, | 423 | struct GNUNET_SERVER_Client *client, |
511 | const struct GNUNET_MessageHeader *message) | 424 | const struct GNUNET_MessageHeader *message) |
512 | { | 425 | { |
513 | struct ConsensusClient *consensus_client; | 426 | struct ConsensusSession *session; |
514 | 427 | ||
515 | consensus_client = find_client (client); | 428 | session = sessions_head; |
516 | if (NULL == consensus_client) | 429 | while ((session != NULL) && (session->client != client)) |
430 | { | ||
431 | session = session->next; | ||
432 | } | ||
433 | if (NULL == session) | ||
517 | { | 434 | { |
518 | GNUNET_SERVER_client_disconnect (client); | 435 | GNUNET_SERVER_client_disconnect (client); |
519 | return; | 436 | return; |
520 | } | 437 | } |
521 | consensus_client->conclude_requested = GNUNET_YES; | 438 | session->conclude_requested = GNUNET_YES; |
522 | send_next (consensus_client); | 439 | send_next (session); |
523 | |||
524 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 440 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
525 | } | 441 | } |
526 | 442 | ||
443 | |||
444 | /** | ||
445 | * Called when a client sends an ack | ||
446 | */ | ||
447 | void | ||
448 | client_ack (void *cls, | ||
449 | struct GNUNET_SERVER_Client *client, | ||
450 | const struct GNUNET_MessageHeader *message) | ||
451 | { | ||
452 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n"); | ||
453 | } | ||
454 | |||
527 | /** | 455 | /** |
528 | * Task that disconnects from core. | 456 | * Task that disconnects from core. |
529 | * | 457 | * |
@@ -538,7 +466,7 @@ disconnect_core (void *cls, | |||
538 | core = (struct GNUNET_CORE_Handle *) cls; | 466 | core = (struct GNUNET_CORE_Handle *) cls; |
539 | GNUNET_CORE_disconnect (core); | 467 | GNUNET_CORE_disconnect (core); |
540 | 468 | ||
541 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); | 469 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); |
542 | } | 470 | } |
543 | 471 | ||
544 | 472 | ||
@@ -554,16 +482,14 @@ core_startup (void *cls, | |||
554 | sizeof (struct GNUNET_MessageHeader)}, | 482 | sizeof (struct GNUNET_MessageHeader)}, |
555 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, | 483 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, |
556 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, | 484 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, |
485 | {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, | ||
486 | sizeof (struct GNUNET_CONSENSUS_AckMessage)}, | ||
557 | {NULL, NULL, 0, 0} | 487 | {NULL, NULL, 0, 0} |
558 | }; | 488 | }; |
559 | 489 | ||
560 | |||
561 | GNUNET_SERVER_add_handlers (srv, handlers); | 490 | GNUNET_SERVER_add_handlers (srv, handlers); |
562 | |||
563 | my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); | 491 | my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); |
564 | |||
565 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); | 492 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); |
566 | |||
567 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); | 493 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); |
568 | } | 494 | } |
569 | 495 | ||
@@ -583,7 +509,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
583 | {NULL, 0, 0} | 509 | {NULL, 0, 0} |
584 | }; | 510 | }; |
585 | 511 | ||
586 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "run\n"); | 512 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); |
587 | 513 | ||
588 | cfg = c; | 514 | cfg = c; |
589 | srv = server; | 515 | srv = server; |