aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c608
1 files changed, 608 insertions, 0 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
new file mode 100644
index 000000000..6cf3c0653
--- /dev/null
+++ b/src/consensus/gnunet-service-consensus.c
@@ -0,0 +1,608 @@
1/*
2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21
22#include "platform.h"
23#include "gnunet_protocols.h"
24#include "gnunet_common.h"
25#include "gnunet_service_lib.h"
26#include "gnunet_consensus_service.h"
27#include "gnunet_core_service.h"
28#include "gnunet_container_lib.h"
29#include "consensus.h"
30
31
32struct ConsensusClient;
33
34static void
35send_next (struct ConsensusClient *cli);
36
37
38/**
39 * An element that is waiting to be transmitted to a client.
40 */
41struct PendingElement
42{
43 /**
44 * Pending elements are kept in a DLL.
45 */
46 struct PendingElement *next;
47
48 /**
49 * Pending elements are kept in a DLL.
50 */
51 struct PendingElement *prev;
52
53 /**
54 * The actual element
55 */
56 struct GNUNET_CONSENSUS_Element *element;
57};
58
59
60/**
61 * A consensus session consists of one or more local clients,
62 * as well as zero or more remote authorities.
63 */
64struct ConsensusSession
65{
66 /**
67 * Consensus sessions are kept in a DLL.
68 */
69 struct ConsensusSession *next;
70
71 /**
72 * Consensus sessions are kept in a DLL.
73 */
74 struct ConsensusSession *prev;
75
76 /**
77 * Consensus clients are kept in a DLL.
78 */
79 struct ConsensusClient *clients_head;
80
81 /**
82 * Consensus clients are kept in a DLL.
83 */
84 struct ConsensusClient *clients_tail;
85
86 /**
87 * Local consensus identification, chosen by clients.
88 */
89 struct GNUNET_HashCode *local_id;
90
91 /**
92 * Global consensus identification, computed
93 * from the local id and participating authorities.
94 */
95 struct GNUNET_HashCode *global_id;
96
97 /**
98 * Values in the consensus set of this session.
99 */
100 struct GNUNET_CONTAINER_MultiHashMap *values;
101};
102
103
104struct ConsensusClient
105{
106 /**
107 * Consensus clients are kept in a DLL.
108 */
109 struct ConsensusClient *next;
110 /**
111 * Consensus clients are kept in a DLL.
112 */
113 struct ConsensusClient *prev;
114
115 /**
116 * Corresponding server handle.
117 */
118 struct GNUNET_SERVER_Client *client;
119
120 /**
121 * Client wants to receive and send updates.
122 */
123 int begin;
124
125 /**
126 * Session this client belongs to
127 */
128 struct ConsensusSession *session;
129
130 /**
131 * Values in the consensus set of this client.
132 * Includes pending elements.
133 */
134 struct GNUNET_CONTAINER_MultiHashMap *values;
135
136 /**
137 * Elements that have not been set to the client yet.
138 */
139 struct PendingElement *pending_head;
140 /**
141 * Elements that have not been set to the client yet.
142 */
143 struct PendingElement *pending_tail;
144
145 /**
146 * Currently active transmit handle for sending to the client
147 */
148 struct GNUNET_SERVER_TransmitHandle *th;
149
150 /**
151 * Once conclude_requested is GNUNET_YES, the client may not
152 * insert any more values.
153 */
154 int conclude_requested;
155
156 /**
157 * Client has been informed about the conclusion.
158 */
159 int conclude_sent;
160};
161
162
163/**
164 * Linked list of sesstions this peer participates in.
165 */
166static struct ConsensusSession *sessions_head;
167
168/**
169 * Linked list of sesstions this peer participates in.
170 */
171static struct ConsensusSession *sessions_tail;
172
173/**
174 * Configuration of the consensus service.
175 */
176static const struct GNUNET_CONFIGURATION_Handle *cfg;
177
178/**
179 * Handle to the server for this service.
180 */
181static struct GNUNET_SERVER_Handle *srv;
182
183/**
184 * Peer that runs this service
185 */
186static struct GNUNET_PeerIdentity *my_peer;
187
188
189struct ConsensusClient *
190find_client (const struct GNUNET_SERVER_Client *srv_client)
191{
192 struct ConsensusSession *session;
193 struct ConsensusClient *client;
194
195 session = sessions_head;
196 while (NULL != session)
197 {
198 client = session->clients_head;
199 while (NULL != client)
200 {
201 if (client->client == srv_client)
202 {
203 return client;
204 }
205 client = client->next;
206 }
207 session = session->next;
208 }
209 return NULL;
210}
211
212static void
213disconnect_client (struct GNUNET_SERVER_Client *client)
214{
215 /* FIXME */
216}
217
218static void
219compute_global_id (struct GNUNET_HashCode *dst,
220 const struct GNUNET_HashCode *local_id,
221 const struct GNUNET_PeerIdentity *peers,
222 int num_peers)
223{
224 *dst = *local_id;
225
226 /* FIXME: hash other peers into global id */
227}
228
229
230
231/**
232 * Iterator over hash map entries.
233 *
234 * @param cls closure, the client
235 * @param key current key code
236 * @param value value in the hash map
237 * @return GNUNET_YES if we should continue to
238 * iterate,
239 * GNUNET_NO if not.
240 */
241int
242update_pending (void *cls,
243 const struct GNUNET_HashCode *key,
244 void *value)
245{
246 struct ConsensusClient *cli;
247 struct GNUNET_CONSENSUS_Element *element;
248 struct PendingElement *pending_element;
249
250 cli = (struct ConsensusClient *) cls;
251 element = (struct GNUNET_CONSENSUS_Element *) value;
252 pending_element = GNUNET_malloc (sizeof (struct PendingElement));
253 pending_element->element = element;
254
255 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (cli->values, key))
256 {
257 GNUNET_CONTAINER_DLL_insert_tail (cli->pending_head, cli->pending_tail, pending_element);
258 GNUNET_CONTAINER_multihashmap_put (cli->values, key, element, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
259 }
260
261 return GNUNET_YES;
262}
263
264
265
266
267static size_t
268transmit_pending (void *cls, size_t size, void *buf)
269{
270 struct GNUNET_CONSENSUS_Element *element;
271 struct GNUNET_CONSENSUS_ElementMessage *msg;
272 struct ConsensusClient *cli;
273
274 cli = (struct ConsensusClient *) cls;
275 msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
276 element = cli->pending_head->element;
277
278 GNUNET_assert (NULL != element);
279
280 cli->th = NULL;
281
282 msg->element_type = element->type;
283 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
284 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size);
285 memcpy (&msg[1], &element[1], element->size);
286
287
288 cli->pending_head = cli->pending_head->next;
289
290 send_next (cli);
291
292 return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size;
293}
294
295
296static size_t
297transmit_conclude_done (void *cls, size_t size, void *buf)
298{
299 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
300
301 msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf;
302 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
303 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage));
304 msg->num_peers = htons (0);
305
306 return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage);
307}
308
309
310/**
311 * Schedule sending the next message (if there is any) to a client.
312 *
313 * @param cli the client to send the next message to
314 */
315static void
316send_next (struct ConsensusClient *cli)
317{
318 int msize;
319
320 GNUNET_assert (NULL != cli);
321
322 if (NULL != cli->th)
323 {
324 return;
325 }
326
327 if ((cli->conclude_requested == GNUNET_YES) && (cli->conclude_sent == GNUNET_NO))
328 {
329 /* just the conclude message with no other authorities in the dummy */
330 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
331 cli->th =
332 GNUNET_SERVER_notify_transmit_ready (cli->client, msize,
333 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, cli);
334 cli->conclude_sent = GNUNET_YES;
335 }
336 else if (NULL != cli->pending_head)
337 {
338 msize = cli->pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage);
339 cli->th =
340 GNUNET_SERVER_notify_transmit_ready (cli->client, msize,
341 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, cli);
342 }
343}
344
345
346/**
347 * Called when a client wants to join a consensus session.
348 */
349static void
350client_join (void *cls,
351 struct GNUNET_SERVER_Client *client,
352 const struct GNUNET_MessageHeader *m)
353{
354 struct GNUNET_HashCode global_id;
355 const struct GNUNET_CONSENSUS_JoinMessage *msg;
356 struct ConsensusSession *session;
357 struct ConsensusClient *consensus_client;
358
359 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join\n");
360
361 fprintf(stderr, "foobar\n");
362
363 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joined\n");
364
365 msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
366
367 /* kill the client if it already is in a session */
368 if (NULL != find_client (client))
369 {
370 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to join twice\n");
371 disconnect_client (client);
372 return;
373 }
374
375 consensus_client = GNUNET_malloc (sizeof (struct ConsensusClient));
376 consensus_client->client = client;
377 consensus_client->begin = GNUNET_NO;
378 consensus_client->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
379
380 GNUNET_SERVER_client_keep (client);
381
382 GNUNET_assert (NULL != consensus_client->values);
383
384 compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers);
385
386 /* look if we already have a session for this local id */
387 session = sessions_head;
388 while (NULL != session)
389 {
390 if (0 == memcmp(&global_id, session->global_id, sizeof (struct GNUNET_HashCode)))
391 {
392 GNUNET_CONTAINER_DLL_insert (session->clients_head, session->clients_tail, consensus_client);
393 GNUNET_SERVER_receive_done (client, GNUNET_OK);
394 return;
395 }
396 session = session->next;
397 }
398
399 /* session does not exist yet, create it */
400 session = GNUNET_malloc (sizeof (struct ConsensusSession));
401 session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode));
402 session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode));
403 session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
404
405 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
406 GNUNET_CONTAINER_DLL_insert (session->clients_head, session->clients_tail, consensus_client);
407
408 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session");
409
410 GNUNET_SERVER_receive_done (client, GNUNET_OK);
411}
412
413
414/**
415 * Called when a client performs an insert operation.
416 */
417void
418client_insert (void *cls,
419 struct GNUNET_SERVER_Client *client,
420 const struct GNUNET_MessageHeader *m)
421{
422 struct ConsensusClient *consensus_client;
423 struct GNUNET_CONSENSUS_ElementMessage *msg;
424 struct GNUNET_CONSENSUS_Element *element;
425 struct PendingElement *pending_element;
426 struct GNUNET_HashCode key;
427 int element_size;
428
429 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n");
430
431 consensus_client = find_client (client);
432
433 if (NULL == consensus_client)
434 {
435 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
436 GNUNET_SERVER_client_disconnect (client);
437 return;
438 }
439
440 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
441 element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
442
443 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
444
445 element->type = msg->element_type;
446 element->size = element_size;
447 memcpy (&element[1], &msg[1], element_size);
448 element->data = &element[1];
449
450 GNUNET_CRYPTO_hash (element, element_size, &key);
451
452 GNUNET_CONTAINER_multihashmap_put (consensus_client->session->values, &key, element,
453 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
454 GNUNET_CONTAINER_multihashmap_put (consensus_client->values, &key, element,
455 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
456
457 /* send the new value to all clients that don't have it */
458
459 consensus_client = consensus_client->session->clients_head;
460 while (NULL != consensus_client)
461 {
462 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (consensus_client->values, &key))
463 {
464 pending_element = GNUNET_malloc (sizeof (struct PendingElement));
465 pending_element->element = element;
466 GNUNET_CONTAINER_DLL_insert_tail (consensus_client->pending_head, consensus_client->pending_tail, pending_element);
467 GNUNET_CONTAINER_multihashmap_put (consensus_client->values, &key, element,
468 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
469 send_next (consensus_client);
470 }
471 }
472
473 GNUNET_SERVER_receive_done (client, GNUNET_OK);
474}
475
476
477/**
478 * Called when a client wants to begin
479 */
480void
481client_begin (void *cls,
482 struct GNUNET_SERVER_Client *client,
483 const struct GNUNET_MessageHeader *message)
484{
485 struct ConsensusClient *consensus_client;
486
487 consensus_client = find_client (client);
488
489 if (NULL == consensus_client)
490 {
491 GNUNET_SERVER_client_disconnect (client);
492 return;
493 }
494
495 consensus_client->begin = GNUNET_YES;
496
497 GNUNET_CONTAINER_multihashmap_iterate (consensus_client->session->values, &update_pending, NULL);
498 send_next (consensus_client);
499
500 GNUNET_SERVER_receive_done (client, GNUNET_OK);
501}
502
503
504
505/**
506 * Called when a client performs the conclude operation.
507 */
508void
509client_conclude (void *cls,
510 struct GNUNET_SERVER_Client *client,
511 const struct GNUNET_MessageHeader *message)
512{
513 struct ConsensusClient *consensus_client;
514
515 consensus_client = find_client (client);
516 if (NULL == consensus_client)
517 {
518 GNUNET_SERVER_client_disconnect (client);
519 return;
520 }
521 consensus_client->conclude_requested = GNUNET_YES;
522 send_next (consensus_client);
523
524 GNUNET_SERVER_receive_done (client, GNUNET_OK);
525}
526
527/**
528 * Task that disconnects from core.
529 *
530 * @param cls core handle
531 * @param tc context information (why was this task triggered now)
532 */
533static void
534disconnect_core (void *cls,
535 const struct GNUNET_SCHEDULER_TaskContext *tc)
536{
537 struct GNUNET_CORE_Handle *core;
538 core = (struct GNUNET_CORE_Handle *) cls;
539 GNUNET_CORE_disconnect (core);
540
541 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
542}
543
544
545static void
546core_startup (void *cls,
547 struct GNUNET_CORE_Handle *core,
548 const struct GNUNET_PeerIdentity *peer)
549{
550 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
551 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
552 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
553 {&client_begin, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN,
554 sizeof (struct GNUNET_MessageHeader)},
555 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
556 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
557 {NULL, NULL, 0, 0}
558 };
559
560
561 GNUNET_SERVER_add_handlers (srv, handlers);
562
563 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
564
565 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
566
567 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
568}
569
570
571/**
572 * Process consensus requests.
573 *
574 * @param cls closure
575 * @param server the initialized server
576 * @param c configuration to use
577 */
578static void
579run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
580{
581 struct GNUNET_CORE_Handle *my_core;
582 static const struct GNUNET_CORE_MessageHandler handlers[] = {
583 {NULL, 0, 0}
584 };
585
586 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "run\n");
587
588 cfg = c;
589 srv = server;
590 my_core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers);
591 GNUNET_assert (NULL != my_core);
592}
593
594
595/**
596 * The main function for the statistics service.
597 *
598 * @param argc number of arguments from the command line
599 * @param argv command line arguments
600 * @return 0 ok, 1 on error
601 */
602int
603main (int argc, char *const *argv)
604{
605 return (GNUNET_OK ==
606 GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
607}
608