aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2012-12-05 21:41:09 +0000
committerFlorian Dold <florian.dold@gmail.com>2012-12-05 21:41:09 +0000
commitaac85d938153d2f181d4bfd08eb734be980bab43 (patch)
tree806ee375f14540e08e3b4b71582a13a731d7192a /src/consensus
parent612f87ce7ff13706d291c441de26eaf15ded5199 (diff)
downloadgnunet-aac85d938153d2f181d4bfd08eb734be980bab43.tar.gz
gnunet-aac85d938153d2f181d4bfd08eb734be980bab43.zip
consensus api, consensus service (local), peer driver and ibf sketch
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/Makefile.am11
-rw-r--r--src/consensus/consensus.h14
-rw-r--r--src/consensus/consensus_api.c238
-rw-r--r--src/consensus/gnunet-consensus-start-peers.c172
-rw-r--r--src/consensus/gnunet-consensus.c184
-rw-r--r--src/consensus/gnunet-service-consensus.c322
-rw-r--r--src/consensus/ibf.c244
-rw-r--r--src/consensus/ibf.h98
-rw-r--r--src/consensus/test_consensus.conf8
9 files changed, 1043 insertions, 248 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index 10b22cc87..29c466901 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -16,7 +16,8 @@ if USE_COVERAGE
16endif 16endif
17 17
18bin_PROGRAMS = \ 18bin_PROGRAMS = \
19 gnunet-consensus 19 gnunet-consensus \
20 gnunet-consensus-start-peers
20 21
21libexec_PROGRAMS = \ 22libexec_PROGRAMS = \
22 gnunet-service-consensus 23 gnunet-service-consensus
@@ -31,6 +32,14 @@ gnunet_consensus_LDADD = \
31 $(top_builddir)/src/consensus/libgnunetconsensus.la \ 32 $(top_builddir)/src/consensus/libgnunetconsensus.la \
32 $(GN_LIBINTL) 33 $(GN_LIBINTL)
33 34
35gnunet_consensus_start_peers_SOURCES = \
36 gnunet-consensus-start-peers.c
37gnunet_consensus_start_peers_LDADD = \
38 $(top_builddir)/src/util/libgnunetutil.la \
39 $(top_builddir)/src/testbed/libgnunettestbed.la \
40 $(top_builddir)/src/consensus/libgnunetconsensus.la \
41 $(GN_LIBINTL)
42
34gnunet_service_consensus_SOURCES = \ 43gnunet_service_consensus_SOURCES = \
35 gnunet-service-consensus.c 44 gnunet-service-consensus.c
36gnunet_service_consensus_LDADD = \ 45gnunet_service_consensus_LDADD = \
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h
index 2762e8ff4..d76c6b769 100644
--- a/src/consensus/consensus.h
+++ b/src/consensus/consensus.h
@@ -90,6 +90,20 @@ struct GNUNET_CONSENSUS_ElementMessage
90 /* rest: element data */ 90 /* rest: element data */
91}; 91};
92 92
93struct GNUNET_CONSENSUS_AckMessage
94{
95 /**
96 * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK
97 */
98 struct GNUNET_MessageHeader header;
99
100 /**
101 * Do we want to keep and propagate the element?
102 */
103 uint8_t keep;
104
105};
106
93GNUNET_NETWORK_STRUCT_END 107GNUNET_NETWORK_STRUCT_END
94 108
95#endif 109#endif
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 90b0fdf16..2479c019c 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -24,6 +24,7 @@
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h"
27#include "gnunet_protocols.h" 28#include "gnunet_protocols.h"
28#include "gnunet_client_lib.h" 29#include "gnunet_client_lib.h"
29#include "gnunet_consensus_service.h" 30#include "gnunet_consensus_service.h"
@@ -32,6 +33,13 @@
32 33
33#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
34 35
36struct ElementAck
37{
38 struct ElementAck *next;
39 struct ElementAck *prev;
40 int keep;
41 struct GNUNET_CONSENSUS_Element *element;
42};
35 43
36/** 44/**
37 * Handle for the service. 45 * Handle for the service.
@@ -113,20 +121,138 @@ struct GNUNET_CONSENSUS_Handle
113 * Deadline for the conclude operation. 121 * Deadline for the conclude operation.
114 */ 122 */
115 struct GNUNET_TIME_Absolute conclude_deadline; 123 struct GNUNET_TIME_Absolute conclude_deadline;
124
125 struct ElementAck *ack_head;
126 struct ElementAck *ack_tail;
127
128 /**
129 * Set to GNUNET_YES if the begin message has been transmitted to the service
130 */
131 int begin_sent;
132
133 /**
134 * Set to GNUNET_YES it the begin message should be transmitted to the service
135 */
136 int begin_requested;
116}; 137};
117 138
118 139
140static size_t
141transmit_ack (void *cls, size_t size, void *buf);
142
143static size_t
144transmit_insert (void *cls, size_t size, void *buf);
145
146static size_t
147transmit_conclude (void *cls, size_t size, void *buf);
148
149static size_t
150transmit_begin (void *cls, size_t size, void *buf);
151
152
153/**
154 * Call notify_transmit_ready for ack if necessary and possible.
155 */
156static void
157ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus)
158{
159 if ((NULL == consensus->th) && (NULL != consensus->ack_head))
160 {
161 consensus->th =
162 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
163 sizeof (struct GNUNET_CONSENSUS_AckMessage),
164 GNUNET_TIME_UNIT_FOREVER_REL,
165 GNUNET_NO, &transmit_ack, consensus);
166 }
167}
168
169
170/**
171 * Call notify_transmit_ready for ack if necessary and possible.
172 */
173static void
174ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus)
175{
176 if ((NULL == consensus->th) && (NULL != consensus->insert_element))
177 {
178 consensus->th =
179 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
180 sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
181 consensus->insert_element->size,
182 GNUNET_TIME_UNIT_FOREVER_REL,
183 GNUNET_NO, &transmit_insert, consensus);
184 }
185}
186
187
188/**
189 * Call notify_transmit_ready for ack if necessary and possible.
190 */
191static void
192ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
193{
194 if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
195 {
196 consensus->th =
197 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
198 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
199 GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
200 GNUNET_NO, &transmit_conclude, consensus);
201 }
202}
203
204
205/**
206 * Call notify_transmit_ready for ack if necessary and possible.
207 */
208static void
209ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
210{
211 if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) &&
212 (GNUNET_NO == consensus->begin_sent))
213 {
214 consensus->th =
215 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
216 sizeof (struct GNUNET_MessageHeader),
217 GNUNET_TIME_UNIT_FOREVER_REL,
218 GNUNET_NO, &transmit_begin, consensus);
219 }
220}
221
222/**
223 * Called when the server has sent is a new element
224 *
225 * @param consensus consensus handle
226 * @param msg element message
227 */
119static void 228static void
120handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, 229handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
121 struct GNUNET_CONSENSUS_ElementMessage *msg) 230 struct GNUNET_CONSENSUS_ElementMessage *msg)
122{ 231{
123 struct GNUNET_CONSENSUS_Element element; 232 struct GNUNET_CONSENSUS_Element element;
233 struct ElementAck *ack;
234 int ret;
235
124 element.type = msg->element_type; 236 element.type = msg->element_type;
125 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); 237 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
126 element.data = &msg[1]; 238 element.data = &msg[1];
127 consensus->new_element_cb (consensus->new_element_cls, &element); 239
240 ret = consensus->new_element_cb (consensus->new_element_cls, &element);
241 ack = GNUNET_malloc (sizeof (struct ElementAck));
242 ack->keep = ret;
243 GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack);
244
245 ntr_ack (consensus);
128} 246}
129 247
248
249/**
250 * Called when the server has announced
251 * that the conclusion is over.
252 *
253 * @param consensus consensus handle
254 * @param msg conclude done message
255 */
130static void 256static void
131handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, 257handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
132 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) 258 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
@@ -170,7 +296,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
170 return; 296 return;
171 } 297 }
172 298
173 switch (ntohs(msg->type)) 299 switch (ntohs (msg->type))
174 { 300 {
175 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: 301 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
176 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); 302 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
@@ -200,6 +326,43 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
200 * @return number of bytes written to buf 326 * @return number of bytes written to buf
201 */ 327 */
202static size_t 328static size_t
329transmit_ack (void *cls, size_t size, void *buf)
330{
331 struct GNUNET_CONSENSUS_AckMessage *msg;
332 struct GNUNET_CONSENSUS_Handle *consensus;
333
334 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
335
336 GNUNET_assert (NULL != consensus->ack_head);
337
338 msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
339 msg->keep = consensus->ack_head->keep;
340 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
341 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
342
343 consensus->ack_head = consensus->ack_head->next;
344
345 consensus->th = NULL;
346
347 ntr_insert (consensus);
348 ntr_ack (consensus);
349 ntr_conclude (consensus);
350
351 return sizeof (struct GNUNET_CONSENSUS_AckMessage);
352}
353
354/**
355 * Function called to notify a client about the connection
356 * begin ready to queue more data. "buf" will be
357 * NULL and "size" zero if the connection was closed for
358 * writing in the meantime.
359 *
360 * @param cls closure
361 * @param size number of bytes available in buf
362 * @param buf where the callee should write the message
363 * @return number of bytes written to buf
364 */
365static size_t
203transmit_insert (void *cls, size_t size, void *buf) 366transmit_insert (void *cls, size_t size, void *buf)
204{ 367{
205 struct GNUNET_CONSENSUS_ElementMessage *msg; 368 struct GNUNET_CONSENSUS_ElementMessage *msg;
@@ -227,6 +390,7 @@ transmit_insert (void *cls, size_t size, void *buf)
227 consensus->insert_element->data, 390 consensus->insert_element->data,
228 consensus->insert_element->size); 391 consensus->insert_element->size);
229 392
393 consensus->insert_element = NULL;
230 394
231 idc = consensus->idc; 395 idc = consensus->idc;
232 consensus->idc = NULL; 396 consensus->idc = NULL;
@@ -234,6 +398,11 @@ transmit_insert (void *cls, size_t size, void *buf)
234 consensus->idc_cls = NULL; 398 consensus->idc_cls = NULL;
235 idc (idc_cls, GNUNET_YES); 399 idc (idc_cls, GNUNET_YES);
236 400
401
402 ntr_ack (consensus);
403 ntr_insert (consensus);
404 ntr_conclude (consensus);
405
237 return msize; 406 return msize;
238} 407}
239 408
@@ -273,18 +442,14 @@ transmit_join (void *cls, size_t size, void *buf)
273 msg->header.size = htons (msize); 442 msg->header.size = htons (msize);
274 msg->session_id = consensus->session_id; 443 msg->session_id = consensus->session_id;
275 msg->num_peers = htons (consensus->num_peers); 444 msg->num_peers = htons (consensus->num_peers);
276 memcpy(&msg[1], 445 if (0 != msg->num_peers)
277 consensus->peers, 446 memcpy(&msg[1],
278 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); 447 consensus->peers,
448 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
279 449
280 if (consensus->insert_element != NULL) 450 ntr_insert (consensus);
281 { 451 ntr_begin (consensus);
282 consensus->th = 452 ntr_conclude (consensus);
283 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
284 msize,
285 GNUNET_TIME_UNIT_FOREVER_REL,
286 GNUNET_NO, &transmit_insert, consensus);
287 }
288 453
289 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, 454 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
290 GNUNET_TIME_UNIT_FOREVER_REL); 455 GNUNET_TIME_UNIT_FOREVER_REL);
@@ -325,6 +490,8 @@ transmit_conclude (void *cls, size_t size, void *buf)
325 msg->timeout = 490 msg->timeout =
326 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); 491 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
327 492
493 ntr_ack (consensus);
494
328 return msize; 495 return msize;
329} 496}
330 497
@@ -359,6 +526,10 @@ transmit_begin (void *cls, size_t size, void *buf)
359 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); 526 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
360 msg->size = htons (msize); 527 msg->size = htons (msize);
361 528
529 ntr_ack (consensus);
530 ntr_insert (consensus);
531 ntr_conclude (consensus);
532
362 return msize; 533 return msize;
363} 534}
364 535
@@ -421,8 +592,8 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
421 GNUNET_TIME_UNIT_FOREVER_REL, 592 GNUNET_TIME_UNIT_FOREVER_REL,
422 GNUNET_NO, &transmit_join, consensus); 593 GNUNET_NO, &transmit_join, consensus);
423 594
424 GNUNET_assert (consensus->th != NULL);
425 595
596 GNUNET_assert (consensus->th != NULL);
426 return consensus; 597 return consensus;
427} 598}
428 599
@@ -444,9 +615,9 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
444 GNUNET_CONSENSUS_InsertDoneCallback idc, 615 GNUNET_CONSENSUS_InsertDoneCallback idc,
445 void *idc_cls) 616 void *idc_cls)
446{ 617{
447
448 GNUNET_assert (NULL == consensus->idc); 618 GNUNET_assert (NULL == consensus->idc);
449 GNUNET_assert (NULL == consensus->insert_element); 619 GNUNET_assert (NULL == consensus->insert_element);
620 GNUNET_assert (NULL == consensus->conclude_cb);
450 621
451 consensus->idc = idc; 622 consensus->idc = idc;
452 consensus->idc_cls = idc_cls; 623 consensus->idc_cls = idc_cls;
@@ -454,17 +625,10 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
454 625
455 if (consensus->joined == 0) 626 if (consensus->joined == 0)
456 { 627 {
457 GNUNET_assert (NULL != consensus->th);
458 return; 628 return;
459 } 629 }
460 630
461 GNUNET_assert (NULL == consensus->th); 631 ntr_insert (consensus);
462
463 consensus->th =
464 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
465 element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
466 GNUNET_TIME_UNIT_FOREVER_REL,
467 GNUNET_NO, &transmit_insert, consensus);
468} 632}
469 633
470 634
@@ -478,12 +642,12 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
478{ 642{
479 GNUNET_assert (NULL == consensus->idc); 643 GNUNET_assert (NULL == consensus->idc);
480 GNUNET_assert (NULL == consensus->insert_element); 644 GNUNET_assert (NULL == consensus->insert_element);
645 GNUNET_assert (GNUNET_NO == consensus->begin_requested);
646 GNUNET_assert (GNUNET_NO == consensus->begin_sent);
481 647
482 consensus->th = 648 consensus->begin_requested = GNUNET_YES;
483 GNUNET_CLIENT_notify_transmit_ready (consensus->client, 649
484 sizeof (struct GNUNET_MessageHeader), 650 ntr_begin (consensus);
485 GNUNET_TIME_UNIT_FOREVER_REL,
486 GNUNET_NO, &transmit_begin, consensus);
487} 651}
488 652
489 653
@@ -503,22 +667,17 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
503 GNUNET_CONSENSUS_ConcludeCallback conclude, 667 GNUNET_CONSENSUS_ConcludeCallback conclude,
504 void *conclude_cls) 668 void *conclude_cls)
505{ 669{
506 GNUNET_assert (NULL == consensus->th); 670 GNUNET_assert (NULL != conclude);
507 GNUNET_assert (NULL == consensus->conclude_cb); 671 GNUNET_assert (NULL == consensus->conclude_cb);
508 672
509 consensus->conclude_cls = conclude_cls; 673 consensus->conclude_cls = conclude_cls;
510 consensus->conclude_cb = conclude; 674 consensus->conclude_cb = conclude;
511 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); 675 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
512 676
513 consensus->th = 677
514 GNUNET_CLIENT_notify_transmit_ready (consensus->client, 678 /* if transmitting the conclude message is not possible right now, transmit_join
515 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), 679 * or transmit_ack will handle it */
516 timeout, 680 ntr_conclude (consensus);
517 GNUNET_NO, &transmit_conclude, consensus);
518 if (NULL == consensus->th)
519 {
520 conclude(conclude_cls, 0, NULL);
521 }
522} 681}
523 682
524 683
@@ -536,7 +695,8 @@ GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
536 GNUNET_CLIENT_disconnect (consensus->client); 695 GNUNET_CLIENT_disconnect (consensus->client);
537 consensus->client = NULL; 696 consensus->client = NULL;
538 } 697 }
539 GNUNET_free (consensus->peers); 698 if (NULL != consensus->peers)
699 GNUNET_free (consensus->peers);
540 GNUNET_free (consensus); 700 GNUNET_free (consensus);
541} 701}
542 702
diff --git a/src/consensus/gnunet-consensus-start-peers.c b/src/consensus/gnunet-consensus-start-peers.c
new file mode 100644
index 000000000..19eec7744
--- /dev/null
+++ b/src/consensus/gnunet-consensus-start-peers.c
@@ -0,0 +1,172 @@
1
2/*
3 This file is part of GNUnet
4 (C) 2012 Christian Grothoff (and other contributing authors)
5
6 GNUnet is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 2, or (at your
9 option) any later version.
10
11 GNUnet is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with GNUnet; see the file COPYING. If not, write to the
18 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 Boston, MA 02111-1307, USA.
20 */
21
22/**
23 * @file consensus/gnunet-consensus-start-peers.c
24 * @brief Starts peers with testebed on localhost,
25 * prints their configuration files and waits for ^C.
26 * @author Florian Dold
27 */
28#include "platform.h"
29#include "gnunet_util_lib.h"
30#include "gnunet_testbed_service.h"
31
32
33static char *config_template_file;
34static unsigned int num_peers_requested = 2;
35static struct GNUNET_TESTBED_Peer **peers;
36
37
38/**
39 * Callback to be called when the requested peer information is available
40 *
41 * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
42 * @param op the operation this callback corresponds to
43 * @param pinfo the result; will be NULL if the operation has failed
44 * @param emsg error message if the operation has failed; will be NULL if the
45 * operation is successfull
46 */
47static void
48peer_info_cb (void *cb_cls,
49 struct GNUNET_TESTBED_Operation
50 *op,
51 const struct
52 GNUNET_TESTBED_PeerInformation
53 *pinfo,
54 const char *emsg)
55{
56 GNUNET_assert (NULL == emsg);
57 if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY)
58 {
59 struct GNUNET_CRYPTO_HashAsciiEncoded enc;
60 GNUNET_CRYPTO_hash_to_enc (&pinfo->result.id->hashPubKey, &enc);
61 printf("peer %td identity:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) - &peers[0]);
62 printf("%s\n", (char *)&enc);
63 }
64 else if (pinfo->pit == GNUNET_TESTBED_PIT_CONFIGURATION)
65 {
66 char *tmpfilename;
67 if (NULL == (tmpfilename = GNUNET_DISK_mktemp ("gnunet-consensus")))
68 {
69 GNUNET_break (0);
70 GNUNET_SCHEDULER_shutdown ();
71 return;
72 }
73 if (GNUNET_SYSERR ==
74 GNUNET_CONFIGURATION_write (pinfo->result.cfg,
75 tmpfilename))
76 {
77 GNUNET_break (0);
78 return;
79 }
80 printf("peer %td config file:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) - &peers[0]);
81 printf("%s\n", tmpfilename);
82 }
83 else
84 {
85 GNUNET_assert (0);
86 }
87}
88
89
90
91/**
92 * Signature of the event handler function called by the
93 * respective event controller.
94 *
95 * @param cls closure
96 * @param event information about the event
97 */
98static void
99controller_cb(void *cls,
100 const struct GNUNET_TESTBED_EventInformation *event)
101{
102 GNUNET_assert (0);
103}
104
105
106
107
108static void
109test_master (void *cls,
110 unsigned int num_peers,
111 struct GNUNET_TESTBED_Peer **started_peers)
112{
113 int i;
114
115 printf("started %d peers\n", num_peers);
116 peers = started_peers;
117
118 for (i = 0; i < num_peers; i++)
119 {
120 GNUNET_TESTBED_peer_get_information (peers[i],
121 GNUNET_TESTBED_PIT_IDENTITY,
122 peer_info_cb,
123 &peers[i]);
124 GNUNET_TESTBED_peer_get_information (peers[i],
125 GNUNET_TESTBED_PIT_CONFIGURATION,
126 peer_info_cb,
127 &peers[i]);
128 }
129}
130
131
132static void
133run (void *cls, char *const *args, const char *cfgfile,
134 const struct GNUNET_CONFIGURATION_Handle *config)
135{
136 if (NULL == config_template_file)
137 {
138 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no template file specified\n");
139 return;
140 }
141
142 GNUNET_TESTBED_test_run ("gnunet-consensus-start-peers",
143 config_template_file,
144 num_peers_requested,
145 0,
146 controller_cb,
147 NULL,
148 test_master,
149 NULL);
150}
151
152
153int
154main (int argc, char **argv)
155{
156 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
157 { 't', "config-template", "TEMPLATE",
158 gettext_noop ("start peers with the given template configuration"),
159 GNUNET_YES, &GNUNET_GETOPT_set_string, &config_template_file },
160 { 'n', "num-peers", "NUM",
161 gettext_noop ("number of peers to start"),
162 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers_requested },
163 GNUNET_GETOPT_OPTION_END
164 };
165
166 /* run without scheduler, as test_run already does this */
167 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-start-peers",
168 "help",
169 options, &run, NULL, GNUNET_YES);
170 return 0;
171}
172
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index ef067ea34..12d0965e9 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -29,6 +29,112 @@
29 29
30 30
31 31
32/**
33 * Handle to the consensus service
34 */
35static struct GNUNET_CONSENSUS_Handle *consensus;
36/**
37 * Session id
38 */
39static char *session_id_str;
40
41/**
42 * File handle to STDIN
43 */
44static struct GNUNET_DISK_FileHandle *stdin_fh;
45
46/**
47 * Task for reading from stdin
48 */
49static GNUNET_SCHEDULER_TaskIdentifier stdin_tid = GNUNET_SCHEDULER_NO_TASK;
50
51/**
52 * Element currently being sent to the service
53 */
54static struct GNUNET_CONSENSUS_Element *element;
55
56
57
58static void
59stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
60
61
62/**
63 * Called when a conclusion was successful.
64 *
65 * @param cls
66 * @param num_peers_in_consensus
67 * @param peers_in_consensus
68 */
69static void
70conclude_cb (void *cls,
71 unsigned int num_peers_in_consensus,
72 const struct GNUNET_PeerIdentity *peers_in_consensus)
73{
74 printf("reached conclusion with %d peers\n", num_peers_in_consensus);
75 GNUNET_SCHEDULER_shutdown ();
76}
77
78
79
80static void
81insert_done_cb (void *cls,
82 int success)
83{
84 if (GNUNET_YES != success)
85 {
86 printf ("insert failed\n");
87 GNUNET_SCHEDULER_shutdown ();
88 }
89
90 GNUNET_free (element);
91
92 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == stdin_tid);
93
94 stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh,
95 &stdin_cb, NULL);
96}
97
98
99/**
100 * Called whenever we can read stdin non-blocking
101 *
102 * @param cls unused
103 * @param tc scheduler context
104 */
105static void
106stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
107{
108 char buf[1024];
109 char *ret;
110 ret = fgets (buf, 1024, stdin);
111
112 stdin_tid = GNUNET_SCHEDULER_NO_TASK;
113
114 if (NULL == ret)
115 {
116 if (feof (stdin))
117 {
118 printf ("concluding ...\n");
119 GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, conclude_cb, NULL);
120 }
121 else
122 {
123 GNUNET_SCHEDULER_shutdown ();
124 }
125 return;
126 }
127
128 printf("read: %s", buf);
129
130 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + strlen(buf) + 1);
131 element->type = 0;
132 element->size = strlen(buf) + 1;
133 element->data = &element[1];
134 strcpy((char *) &element[1], buf);
135
136 GNUNET_CONSENSUS_insert (consensus, element, insert_done_cb, NULL);
137}
32 138
33/** 139/**
34 * Called when a new element was received from another peer, or an error occured. 140 * Called when a new element was received from another peer, or an error occured.
@@ -47,23 +153,82 @@ static int
47cb (void *cls, 153cb (void *cls,
48 struct GNUNET_CONSENSUS_Element *element) 154 struct GNUNET_CONSENSUS_Element *element)
49{ 155{
50 return 0; 156 printf("got element\n");
157 return GNUNET_YES;
51} 158}
52 159
160/**
161 * Function run on shutdown to clean up.
162 *
163 * @param cls the statistics handle
164 * @param tc scheduler context
165 */
166static void
167shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
168{
169
170 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n");
171 if (NULL == consensus)
172 {
173 return;
174 }
175
176 GNUNET_CONSENSUS_destroy (consensus);
177}
53 178
54 179
55static void 180static void
56run (void *cls, char *const *args, const char *cfgfile, 181run (void *cls, char *const *args, const char *cfgfile,
57 const struct GNUNET_CONFIGURATION_Handle *cfg) 182 const struct GNUNET_CONFIGURATION_Handle *cfg)
58{ 183{
59 static struct GNUNET_PeerIdentity pid; 184 struct GNUNET_HashCode sid;
60 static struct GNUNET_HashCode sid; 185 struct GNUNET_PeerIdentity *pids;
61 186 int count;
62 GNUNET_CONSENSUS_create (cfg, 187 int i;
63 1, &pid, 188
64 &sid, 189 if (NULL == session_id_str)
65 &cb, NULL); 190 {
191 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given\n");
192 return;
193 }
194
195 for (count = 0; NULL != args[count]; count++);
196
197 if (0 != count)
198 {
199 pids = GNUNET_malloc (count * sizeof (struct GNUNET_PeerIdentity));
200 }
201 else
202 {
203 pids = NULL;
204 }
205
206 for (i = 0; i < count; i++)
207 {
208 int ret;
209 ret = GNUNET_CRYPTO_hash_from_string (args[i], &pids[i].hashPubKey);
210 if (GNUNET_OK != ret)
211 {
212 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "peer identity '%s' is malformed\n", args[i]);
213 return;
214 }
215 }
216
217 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
218 &shutdown_task, NULL);
66 219
220 consensus =
221 GNUNET_CONSENSUS_create (cfg,
222 count, pids,
223 &sid,
224 &cb, NULL);
225
226 GNUNET_CONSENSUS_begin (consensus);
227
228
229 stdin_fh = GNUNET_DISK_get_handle_from_native (stdin);
230 stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh,
231 &stdin_cb, NULL);
67} 232}
68 233
69 234
@@ -71,6 +236,9 @@ int
71main (int argc, char **argv) 236main (int argc, char **argv)
72{ 237{
73 static const struct GNUNET_GETOPT_CommandLineOption options[] = { 238 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
239 { 's', "session-id", "ID",
240 gettext_noop ("session identifier"),
241 GNUNET_YES, &GNUNET_GETOPT_set_string, &session_id_str },
74 GNUNET_GETOPT_OPTION_END 242 GNUNET_GETOPT_OPTION_END
75 }; 243 };
76 GNUNET_PROGRAM_run (argc, argv, "gnunet-consensus", 244 GNUNET_PROGRAM_run (argc, argv, "gnunet-consensus",
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index b733a0aec..195efc681 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -20,19 +20,19 @@
20 20
21 21
22#include "platform.h" 22#include "platform.h"
23#include "gnunet_protocols.h"
24#include "gnunet_common.h" 23#include "gnunet_common.h"
25#include "gnunet_service_lib.h" 24#include "gnunet_protocols.h"
25#include "gnunet_util_lib.h"
26#include "gnunet_consensus_service.h" 26#include "gnunet_consensus_service.h"
27#include "gnunet_core_service.h" 27#include "gnunet_core_service.h"
28#include "gnunet_container_lib.h" 28#include "gnunet_mesh_service.h"
29#include "consensus.h" 29#include "consensus.h"
30 30
31 31
32struct ConsensusClient; 32struct ConsensusSession;
33 33
34static void 34static void
35send_next (struct ConsensusClient *cli); 35send_next (struct ConsensusSession *session);
36 36
37 37
38/** 38/**
@@ -58,8 +58,7 @@ struct PendingElement
58 58
59 59
60/** 60/**
61 * A consensus session consists of one or more local clients, 61 * A consensus session consists of one local client and the remote authorities.
62 * as well as zero or more remote authorities.
63 */ 62 */
64struct ConsensusSession 63struct ConsensusSession
65{ 64{
@@ -74,18 +73,8 @@ struct ConsensusSession
74 struct ConsensusSession *prev; 73 struct ConsensusSession *prev;
75 74
76 /** 75 /**
77 * Consensus clients are kept in a DLL. 76 * Local consensus identification, chosen by clients.
78 */
79 struct ConsensusClient *clients_head;
80
81 /**
82 * Consensus clients are kept in a DLL.
83 */ 77 */
84 struct ConsensusClient *clients_tail;
85
86 /**
87 * Local consensus identification, chosen by clients.
88 */
89 struct GNUNET_HashCode *local_id; 78 struct GNUNET_HashCode *local_id;
90 79
91 /** 80 /**
@@ -95,24 +84,6 @@ struct ConsensusSession
95 struct GNUNET_HashCode *global_id; 84 struct GNUNET_HashCode *global_id;
96 85
97 /** 86 /**
98 * Values in the consensus set of this session.
99 */
100 struct GNUNET_CONTAINER_MultiHashMap *values;
101};
102
103
104struct ConsensusClient
105{
106 /**
107 * Consensus clients are kept in a DLL.
108 */
109 struct ConsensusClient *next;
110 /**
111 * Consensus clients are kept in a DLL.
112 */
113 struct ConsensusClient *prev;
114
115 /**
116 * Corresponding server handle. 87 * Corresponding server handle.
117 */ 88 */
118 struct GNUNET_SERVER_Client *client; 89 struct GNUNET_SERVER_Client *client;
@@ -123,24 +94,30 @@ struct ConsensusClient
123 int begin; 94 int begin;
124 95
125 /** 96 /**
126 * Session this client belongs to 97 * Values in the consensus set of this session,
98 * all of them either have been sent or approved by the client.
127 */ 99 */
128 struct ConsensusSession *session; 100 struct GNUNET_CONTAINER_MultiHashMap *values;
129 101
130 /** 102 /**
131 * Values in the consensus set of this client. 103 * Elements that have not been sent to the client yet.
132 * Includes pending elements.
133 */ 104 */
134 struct GNUNET_CONTAINER_MultiHashMap *values; 105 struct PendingElement *transmit_pending_head;
135 106
136 /** 107 /**
137 * Elements that have not been set to the client yet. 108 * Elements that have not been sent to the client yet.
138 */ 109 */
139 struct PendingElement *pending_head; 110 struct PendingElement *transmit_pending_tail;
111
140 /** 112 /**
141 * Elements that have not been set to the client yet. 113 * Elements that have not been sent to the client yet.
142 */ 114 */
143 struct PendingElement *pending_tail; 115 struct PendingElement *approval_pending_head;
116
117 /**
118 * Elements that have not been sent to the client yet.
119 */
120 struct PendingElement *approval_pending_tail;
144 121
145 /** 122 /**
146 * Currently active transmit handle for sending to the client 123 * Currently active transmit handle for sending to the client
@@ -157,6 +134,11 @@ struct ConsensusClient
157 * Client has been informed about the conclusion. 134 * Client has been informed about the conclusion.
158 */ 135 */
159 int conclude_sent; 136 int conclude_sent;
137
138 /**
139 * Number of other peers in the consensus
140 */
141 int num_peers;
160}; 142};
161 143
162 144
@@ -185,30 +167,6 @@ static struct GNUNET_SERVER_Handle *srv;
185 */ 167 */
186static struct GNUNET_PeerIdentity *my_peer; 168static struct GNUNET_PeerIdentity *my_peer;
187 169
188
189struct ConsensusClient *
190find_client (const struct GNUNET_SERVER_Client *srv_client)
191{
192 struct ConsensusSession *session;
193 struct ConsensusClient *client;
194
195 session = sessions_head;
196 while (NULL != session)
197 {
198 client = session->clients_head;
199 while (NULL != client)
200 {
201 if (client->client == srv_client)
202 {
203 return client;
204 }
205 client = client->next;
206 }
207 session = session->next;
208 }
209 return NULL;
210}
211
212static void 170static void
213disconnect_client (struct GNUNET_SERVER_Client *client) 171disconnect_client (struct GNUNET_SERVER_Client *client)
214{ 172{
@@ -221,73 +179,44 @@ compute_global_id (struct GNUNET_HashCode *dst,
221 const struct GNUNET_PeerIdentity *peers, 179 const struct GNUNET_PeerIdentity *peers,
222 int num_peers) 180 int num_peers)
223{ 181{
224 *dst = *local_id; 182 int i;
183 struct GNUNET_HashCode tmp;
225 184
226 /* FIXME: hash other peers into global id */ 185 *dst = *local_id;
227} 186 for (i = 0; i < num_peers; ++i)
228
229
230
231/**
232 * Iterator over hash map entries.
233 *
234 * @param cls closure, the client
235 * @param key current key code
236 * @param value value in the hash map
237 * @return GNUNET_YES if we should continue to
238 * iterate,
239 * GNUNET_NO if not.
240 */
241int
242update_pending (void *cls,
243 const struct GNUNET_HashCode *key,
244 void *value)
245{
246 struct ConsensusClient *cli;
247 struct GNUNET_CONSENSUS_Element *element;
248 struct PendingElement *pending_element;
249
250 cli = (struct ConsensusClient *) cls;
251 element = (struct GNUNET_CONSENSUS_Element *) value;
252 pending_element = GNUNET_malloc (sizeof (struct PendingElement));
253 pending_element->element = element;
254
255 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (cli->values, key))
256 { 187 {
257 GNUNET_CONTAINER_DLL_insert_tail (cli->pending_head, cli->pending_tail, pending_element); 188 /* FIXME: maybe hash_xor/hash allow aliased source/target, and we can get by without tmp */
258 GNUNET_CONTAINER_multihashmap_put (cli->values, key, element, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 189 GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
190 *dst = tmp;
191 GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
192 *dst = tmp;
259 } 193 }
260
261 return GNUNET_YES;
262} 194}
263 195
264 196
265
266
267static size_t 197static size_t
268transmit_pending (void *cls, size_t size, void *buf) 198transmit_pending (void *cls, size_t size, void *buf)
269{ 199{
270 struct GNUNET_CONSENSUS_Element *element; 200 struct GNUNET_CONSENSUS_Element *element;
271 struct GNUNET_CONSENSUS_ElementMessage *msg; 201 struct GNUNET_CONSENSUS_ElementMessage *msg;
272 struct ConsensusClient *cli; 202 struct ConsensusSession *session;
273 203
274 cli = (struct ConsensusClient *) cls; 204 session = (struct ConsensusSession *) cls;
275 msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf; 205 msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
276 element = cli->pending_head->element; 206 element = session->transmit_pending_head->element;
277 207
278 GNUNET_assert (NULL != element); 208 GNUNET_assert (NULL != element);
279 209
280 cli->th = NULL; 210 session->th = NULL;
281 211
282 msg->element_type = element->type; 212 msg->element_type = element->type;
283 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); 213 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
284 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size); 214 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size);
285 memcpy (&msg[1], element->data, element->size); 215 memcpy (&msg[1], element->data, element->size);
286 216
217 session->transmit_pending_head = session->transmit_pending_head->next;
287 218
288 cli->pending_head = cli->pending_head->next; 219 send_next (session);
289
290 send_next (cli);
291 220
292 return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size; 221 return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size;
293} 222}
@@ -299,7 +228,7 @@ transmit_conclude_done (void *cls, size_t size, void *buf)
299 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg; 228 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
300 229
301 msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf; 230 msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf;
302 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); 231 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
303 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage)); 232 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage));
304 msg->num_peers = htons (0); 233 msg->num_peers = htons (0);
305 234
@@ -313,38 +242,43 @@ transmit_conclude_done (void *cls, size_t size, void *buf)
313 * @param cli the client to send the next message to 242 * @param cli the client to send the next message to
314 */ 243 */
315static void 244static void
316send_next (struct ConsensusClient *cli) 245send_next (struct ConsensusSession *session)
317{ 246{
318 int msize; 247 int msize;
319 248
320 GNUNET_assert (NULL != cli); 249 GNUNET_assert (NULL != session);
321 250
322 if (NULL != cli->th) 251 if (NULL != session->th)
323 { 252 {
324 return; 253 return;
325 } 254 }
326 255
327 if ((cli->conclude_requested == GNUNET_YES) && (cli->conclude_sent == GNUNET_NO)) 256 if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO))
328 { 257 {
329 /* just the conclude message with no other authorities in the dummy */ 258 /* just the conclude message with no other authorities in the dummy */
330 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); 259 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
331 cli->th = 260 session->th =
332 GNUNET_SERVER_notify_transmit_ready (cli->client, msize, 261 GNUNET_SERVER_notify_transmit_ready (session->client, msize,
333 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, cli); 262 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, session);
334 cli->conclude_sent = GNUNET_YES; 263 session->conclude_sent = GNUNET_YES;
335 } 264 }
336 else if (NULL != cli->pending_head) 265 else if (NULL != session->transmit_pending_head)
337 { 266 {
338 msize = cli->pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage); 267 msize = session->transmit_pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage);
339 cli->th = 268 session->th =
340 GNUNET_SERVER_notify_transmit_ready (cli->client, msize, 269 GNUNET_SERVER_notify_transmit_ready (session->client, msize,
341 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, cli); 270 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, session);
271 /* TODO: insert into ack pending */
342 } 272 }
343} 273}
344 274
345 275
346/** 276/**
347 * Called when a client wants to join a consensus session. 277 * Called when a client wants to join a consensus session.
278 *
279 * @param cls unused
280 * @param client client that sent the message
281 * @param m message sent by the client
348 */ 282 */
349static void 283static void
350client_join (void *cls, 284client_join (void *cls,
@@ -354,58 +288,42 @@ client_join (void *cls,
354 struct GNUNET_HashCode global_id; 288 struct GNUNET_HashCode global_id;
355 const struct GNUNET_CONSENSUS_JoinMessage *msg; 289 const struct GNUNET_CONSENSUS_JoinMessage *msg;
356 struct ConsensusSession *session; 290 struct ConsensusSession *session;
357 struct ConsensusClient *consensus_client;
358 291
359 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join\n"); 292 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
360
361 fprintf(stderr, "foobar\n");
362
363 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joined\n");
364 293
365 msg = (struct GNUNET_CONSENSUS_JoinMessage *) m; 294 msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
366
367 /* kill the client if it already is in a session */
368 if (NULL != find_client (client))
369 {
370 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to join twice\n");
371 disconnect_client (client);
372 return;
373 }
374
375 consensus_client = GNUNET_malloc (sizeof (struct ConsensusClient));
376 consensus_client->client = client;
377 consensus_client->begin = GNUNET_NO;
378 consensus_client->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
379
380 GNUNET_SERVER_client_keep (client);
381
382 GNUNET_assert (NULL != consensus_client->values);
383 295
384 compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers); 296 compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers);
385 297
386 /* look if we already have a session for this local id */
387 session = sessions_head; 298 session = sessions_head;
388 while (NULL != session) 299 while (NULL != session)
389 { 300 {
390 if (0 == memcmp(&global_id, session->global_id, sizeof (struct GNUNET_HashCode))) 301 if (client == session->client)
391 { 302 {
392 GNUNET_CONTAINER_DLL_insert (session->clients_head, session->clients_tail, consensus_client); 303 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
393 GNUNET_SERVER_receive_done (client, GNUNET_OK); 304 disconnect_client (client);
305 return;
306 }
307 if (0 == memcmp (session->global_id, &global_id, sizeof (struct GNUNET_HashCode)))
308 {
309 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another client\n");
310 disconnect_client (client);
394 return; 311 return;
395 } 312 }
396 session = session->next;
397 } 313 }
398 314
315 GNUNET_SERVER_client_keep (client);
316
399 /* session does not exist yet, create it */ 317 /* session does not exist yet, create it */
400 session = GNUNET_malloc (sizeof (struct ConsensusSession)); 318 session = GNUNET_malloc (sizeof (struct ConsensusSession));
401 session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode)); 319 session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode));
402 session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode)); 320 session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode));
403 session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); 321 session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
322 session->client = client;
404 323
405 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); 324 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
406 GNUNET_CONTAINER_DLL_insert (session->clients_head, session->clients_tail, consensus_client);
407 325
408 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session"); 326 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
409 327
410 GNUNET_SERVER_receive_done (client, GNUNET_OK); 328 GNUNET_SERVER_receive_done (client, GNUNET_OK);
411} 329}
@@ -419,18 +337,22 @@ client_insert (void *cls,
419 struct GNUNET_SERVER_Client *client, 337 struct GNUNET_SERVER_Client *client,
420 const struct GNUNET_MessageHeader *m) 338 const struct GNUNET_MessageHeader *m)
421{ 339{
422 struct ConsensusClient *consensus_client; 340 struct ConsensusSession *session;
423 struct GNUNET_CONSENSUS_ElementMessage *msg; 341 struct GNUNET_CONSENSUS_ElementMessage *msg;
424 struct GNUNET_CONSENSUS_Element *element; 342 struct GNUNET_CONSENSUS_Element *element;
425 struct PendingElement *pending_element;
426 struct GNUNET_HashCode key; 343 struct GNUNET_HashCode key;
427 int element_size; 344 int element_size;
428 345
429 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n"); 346 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n");
430 347
431 consensus_client = find_client (client); 348 session = sessions_head;
349 while (NULL != session)
350 {
351 if (session->client == client)
352 break;
353 }
432 354
433 if (NULL == consensus_client) 355 if (NULL == session)
434 { 356 {
435 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); 357 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
436 GNUNET_SERVER_client_disconnect (client); 358 GNUNET_SERVER_client_disconnect (client);
@@ -449,28 +371,12 @@ client_insert (void *cls,
449 371
450 GNUNET_CRYPTO_hash (element, element_size, &key); 372 GNUNET_CRYPTO_hash (element, element_size, &key);
451 373
452 GNUNET_CONTAINER_multihashmap_put (consensus_client->session->values, &key, element, 374 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
453 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 375 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
454 GNUNET_CONTAINER_multihashmap_put (consensus_client->values, &key, element,
455 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
456
457 /* send the new value to all clients that don't have it */
458
459 consensus_client = consensus_client->session->clients_head;
460 while (NULL != consensus_client)
461 {
462 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (consensus_client->values, &key))
463 {
464 pending_element = GNUNET_malloc (sizeof (struct PendingElement));
465 pending_element->element = element;
466 GNUNET_CONTAINER_DLL_insert_tail (consensus_client->pending_head, consensus_client->pending_tail, pending_element);
467 GNUNET_CONTAINER_multihashmap_put (consensus_client->values, &key, element,
468 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
469 send_next (consensus_client);
470 }
471 }
472 376
473 GNUNET_SERVER_receive_done (client, GNUNET_OK); 377 GNUNET_SERVER_receive_done (client, GNUNET_OK);
378
379 send_next (session);
474} 380}
475 381
476 382
@@ -482,20 +388,27 @@ client_begin (void *cls,
482 struct GNUNET_SERVER_Client *client, 388 struct GNUNET_SERVER_Client *client,
483 const struct GNUNET_MessageHeader *message) 389 const struct GNUNET_MessageHeader *message)
484{ 390{
485 struct ConsensusClient *consensus_client; 391 struct ConsensusSession *session;
486 392
487 consensus_client = find_client (client); 393 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client requested begin\n");
394
395 session = sessions_head;
396 while (NULL != session)
397 {
398 if (session->client == client)
399 break;
400 }
488 401
489 if (NULL == consensus_client) 402 if (NULL == session)
490 { 403 {
404 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to 'begin', but client is not in any session\n");
491 GNUNET_SERVER_client_disconnect (client); 405 GNUNET_SERVER_client_disconnect (client);
492 return; 406 return;
493 } 407 }
494 408
495 consensus_client->begin = GNUNET_YES; 409 session->begin = GNUNET_YES;
496 410
497 GNUNET_CONTAINER_multihashmap_iterate (consensus_client->session->values, &update_pending, NULL); 411 send_next (session);
498 send_next (consensus_client);
499 412
500 GNUNET_SERVER_receive_done (client, GNUNET_OK); 413 GNUNET_SERVER_receive_done (client, GNUNET_OK);
501} 414}
@@ -510,20 +423,35 @@ client_conclude (void *cls,
510 struct GNUNET_SERVER_Client *client, 423 struct GNUNET_SERVER_Client *client,
511 const struct GNUNET_MessageHeader *message) 424 const struct GNUNET_MessageHeader *message)
512{ 425{
513 struct ConsensusClient *consensus_client; 426 struct ConsensusSession *session;
514 427
515 consensus_client = find_client (client); 428 session = sessions_head;
516 if (NULL == consensus_client) 429 while ((session != NULL) && (session->client != client))
430 {
431 session = session->next;
432 }
433 if (NULL == session)
517 { 434 {
518 GNUNET_SERVER_client_disconnect (client); 435 GNUNET_SERVER_client_disconnect (client);
519 return; 436 return;
520 } 437 }
521 consensus_client->conclude_requested = GNUNET_YES; 438 session->conclude_requested = GNUNET_YES;
522 send_next (consensus_client); 439 send_next (session);
523
524 GNUNET_SERVER_receive_done (client, GNUNET_OK); 440 GNUNET_SERVER_receive_done (client, GNUNET_OK);
525} 441}
526 442
443
444/**
445 * Called when a client sends an ack
446 */
447void
448client_ack (void *cls,
449 struct GNUNET_SERVER_Client *client,
450 const struct GNUNET_MessageHeader *message)
451{
452 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n");
453}
454
527/** 455/**
528 * Task that disconnects from core. 456 * Task that disconnects from core.
529 * 457 *
@@ -538,7 +466,7 @@ disconnect_core (void *cls,
538 core = (struct GNUNET_CORE_Handle *) cls; 466 core = (struct GNUNET_CORE_Handle *) cls;
539 GNUNET_CORE_disconnect (core); 467 GNUNET_CORE_disconnect (core);
540 468
541 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); 469 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
542} 470}
543 471
544 472
@@ -554,16 +482,14 @@ core_startup (void *cls,
554 sizeof (struct GNUNET_MessageHeader)}, 482 sizeof (struct GNUNET_MessageHeader)},
555 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, 483 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
556 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, 484 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
485 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
486 sizeof (struct GNUNET_CONSENSUS_AckMessage)},
557 {NULL, NULL, 0, 0} 487 {NULL, NULL, 0, 0}
558 }; 488 };
559 489
560
561 GNUNET_SERVER_add_handlers (srv, handlers); 490 GNUNET_SERVER_add_handlers (srv, handlers);
562
563 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); 491 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
564
565 GNUNET_SCHEDULER_add_now (&disconnect_core, core); 492 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
566
567 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); 493 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
568} 494}
569 495
@@ -583,7 +509,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
583 {NULL, 0, 0} 509 {NULL, 0, 0}
584 }; 510 };
585 511
586 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "run\n"); 512 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
587 513
588 cfg = c; 514 cfg = c;
589 srv = server; 515 srv = server;
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c
new file mode 100644
index 000000000..d0cb218cc
--- /dev/null
+++ b/src/consensus/ibf.c
@@ -0,0 +1,244 @@
1/*
2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21
22/**
23 * @file consensus/ibf.c
24 * @brief implementation of the invertible bloom filter
25 * @author Florian Dold
26 */
27
28#include "platform.h"
29#include "gnunet_common.h"
30#include "ibf.h"
31
32
33struct PureCells
34{
35 int index;
36 struct PureCells *next;
37 struct PureCells *prev;
38};
39
40struct InvertibleBloomFilter
41{
42 /**
43 * How many cells does this IBF have?
44 */
45 int size;
46
47 /**
48 * In how many cells do we hash one element?
49 * Usually 4 or 3.
50 */
51 int hash_num;
52
53 /**
54 * Salt for mingling hashes
55 */
56 int salt;
57
58 /**
59 * How many times has a bucket been hit?
60 * Can be negative, as a result of IBF subtraction.
61 */
62 int8_t *count;
63
64 /**
65 * xor sums of the elements' hash codes, used to identify the elements.
66 */
67 GNUNET_HashCode *id_sum;
68
69 /**
70 * xor sums of the "hash of the hash".
71 */
72 GNUNET_HashCode *hash_sum;
73
74 struct PureCells *pure_head;
75 struct PureCells *pure_tail;
76
77 /**
78 * GNUNET_YES: fresh list is deprecated
79 * GNUNET_NO: fresh list is up to date
80 */
81 int pure_fresh;
82};
83
84
85/**
86 * Create an invertible bloom filter.
87 */
88struct InvertibleBloomFilter *
89ibf_create(int size, int hash_num)
90{
91 struct InvertibleBloomFilter *ibf;
92
93 ibf = GNUNET_malloc (sizeof (struct InvertibleBloomFilter));
94 ibf->count = GNUNET_malloc (size * sizeof uint8_t);
95 ibf->id_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode));
96 ibf->hash_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode));
97 ibf->size = size;
98 ibf->hash_num = hash_num;
99}
100
101
102/**
103 * Insert an element into an IBF.
104 */
105void
106ibf_insert (struct InvertibleBloomFilter *ibf, struct GNUNET_HashCode *id)
107{
108 struct GNUNET_HashCode key;
109 struct GNUNET_HashCode id_hash;
110 int i;
111
112 key = *id;
113 GNUNET_hash (id, sizeof (struct GNUNET_HashCode), &id_hash);
114
115 for (i = 0; i < ibf->hash_num; i++)
116 {
117 int bucket;
118 int j;
119 if ((i != 0) && (i % 16) == 0)
120 {
121 GNUNET_hash (&key, sizeof (struct GNUNET_HashCode), &key);
122 }
123 bucket = hash.bits[i%16] % ibf->size;
124
125 /* count<0 can happen after ibf subtraction, but then no insert should be done */
126 GNUNET_assert (ibf->count[bucket] >= 0);
127
128 ibf->count[bucket]++;
129
130 for (j=0; j < 16; j++)
131 {
132 ibf->id_sum.bits[j] ^= &id;
133 ibf->hash_sum.bits[j] ^= &id_hash;
134 }
135
136 }
137}
138
139
140/**
141 * Update the linked list of pure cells, if not fresh anymore
142 */
143void
144update_pure (struct InvertibleBloomFilter *ibf)
145{
146 if (GNUNET_YES == ibf->pure_fresh)
147 {
148 return;
149 }
150
151 ibf->pure_fresh = GNUNET_YES;
152}
153
154/**
155 * Decode and remove an element from the IBF, if possible.
156 *
157 * @param ibf the invertible bloom filter to decode
158 * @param ret_id the hash code of the decoded element, if successful
159 * @param side sign of the cell's count where the decoded element came from.
160 * A negative sign indicates that the element was recovered resides in an IBF
161 * that was previously subtracted from.
162 * @return GNUNET_YES if decoding an element was successful, GNUNET_NO if the IBF is empty,
163 * GNUNET_SYSERR if the decoding has faile
164 */
165int
166ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct GNUNET_HashCode *ret_id)
167{
168 struct GNUNET_HashCode hash;
169 struct PureCells *pure;
170 int count;
171
172 GNUNET_assert (NULL != ibf);
173 GNUNET_assert (NULL != red_id);
174 GNUNET_assert (NULL != side);
175
176 update_pure (ibf);
177
178 pure = ibf->pure_head;
179 ibf->pure_head = pure->next;
180
181 if (NULL == pure)
182 {
183 int i;
184 for (i = 0; i < ibf->size; i++)
185 {
186 int j;
187 if (0 != ibf->count[i])
188 return GNUNET_SYSERR;
189 for (j = 0; j < 16; ++j)
190 if ((0 != ibf->hash_sum[i].bits[j]) || (0 != ibf->id_sum[i].bits[j]))
191 return GNUNET_SYSERR;
192 return GNUNET_NO;
193 }
194 }
195
196 GNUNET_CRYPTO_hash (ibf->id_sum[pure->idx], sizeof (struct GNUNET_HashCode), &hash);
197
198 if (0 == memcmp (&hash, ibf->hash_sum[pure->idx]))
199 {
200 struct GNUNET_HashCode key;
201 int i;
202
203 *ret_side = ibf->count[pure->index];
204 *ret_id = ibf->id_sum[pure->index];
205
206 key = *ibf->id_sum[pure->index];
207
208 /* delete the item from all buckets */
209 for (i = 0; i < ibf->hash_num; i++)
210 {
211 int bucket;
212 int j;
213 if ((i != 0) && (i % 16) == 0)
214 {
215 GNUNET_hash (&key, sizeof (struct GNUNET_HashCode), &key);
216 }
217 bucket = hash.bits[i%16] % ibf->size;
218
219 ibf->count[bucket] -= count;
220
221 for (j=0; j < 16; j++)
222 {
223 ibf->id_sum.bits[j] ^= &id;
224 ibf->hash_sum.bits[j] ^= &id_hash;
225 }
226 return GNUNET_YES;
227 }
228 return GNUNET_SYSERR;
229}
230
231
232
233/**
234 * Subtract ibf2 from ibf1, storing the result in ibf1.
235 * The two IBF's must have the same parameters size and hash_num.
236 *
237 * @return a newly allocated invertible bloom filter
238 */
239void
240ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *ibf2)
241{
242 /* FIXME */
243}
244
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h
new file mode 100644
index 000000000..2c9931e69
--- /dev/null
+++ b/src/consensus/ibf.h
@@ -0,0 +1,98 @@
1/*
2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21
22/**
23 * @file consensus/ibf.h
24 * @brief invertible bloom filter
25 * @author Florian Dold
26 */
27
28
29/**
30 * Opaque handle to an invertible bloom filter (IBF).
31 *
32 * An IBF is a counting bloom filter that has the ability to restore
33 * the hashes of its stored elements with high probability.
34 */
35struct InvertibleBloomFilter
36
37/**
38 * Create an invertible bloom filter.
39 *
40 * @param size number of IBF buckets
41 * @param salt salt for mingling hashes, different salt may
42 * result in less (or more) collisions
43 * @param hash_num number of buckets one element is hashed in
44 * @return the newly created invertible bloom filter
45 */
46struct InvertibleBloomFilter *
47ibf_create(int size, int salt, int hash_num);
48
49/**
50 * Insert an element into an IBF.
51 *
52 * @param ibf the IBF
53 * @param id the element's hash code
54 */
55void
56ibf_insert (struct InvertibleBloomFilter *ibf, GNUNET_HashCode *id);
57
58/**
59 * Subtract ibf2 from ibf1, storing the result in ibf1.
60 * The two IBF's must have the same parameters size and hash_num.
61 */
62void
63ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *ibf2);
64
65/**
66 * Decode and remove an element from the IBF, if possible.
67 *
68 * @param ibf the invertible bloom filter
69 * @param the id of the element is written to this hash code
70 * @return GNUNET_YES if decoding an element was successful, GNUNET_NO if it failed to decode
71 */
72int
73ibf_decode (struct InvertibleBloomFilter *ibf, struct GNUNET_HashCode *ret_id);
74
75
76/**
77 * Create a copy of an IBF, the copy has to be destroyed properly.
78 *
79 * @param ibf the IBF to copy
80 */
81struct InvertibleBloomFilter *
82ibf_dup (struct InvertibleBloomFilter *ibf);
83
84/*
85ibf_hton();
86
87ibf_ntoh();
88*/
89
90/**
91 * Destroy all resources associated with the invertible bloom filter.
92 * No more ibf_*-functions may be called on ibf after calling destroy.
93 *
94 * @param ibf the intertible bloom filter to destroy
95 */
96void
97ibf_destroy (struct InvertibleBloomFilter *ibf);
98
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 5bd57631b..86dfadb9b 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -10,7 +10,7 @@ ACCEPT_FROM6 = ::1;
10UNIXPATH = /tmp/gnunet-service-consensus.sock 10UNIXPATH = /tmp/gnunet-service-consensus.sock
11UNIX_MATCH_UID = YES 11UNIX_MATCH_UID = YES
12UNIX_MATCH_GID = YES 12UNIX_MATCH_GID = YES
13OPTIONS = -LDEBUG 13OPTIONS = -L INFO
14 14
15 15
16[transport] 16[transport]
@@ -18,4 +18,8 @@ OPTIONS = -LERROR
18 18
19 19
20[arm] 20[arm]
21DEFAULTSERVICES = core 21DEFAULTSERVICES = core consensus
22
23
24[testbed]
25OVERLAY_TOPOLOGY = CLIQUE