diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-01-17 00:53:11 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-01-17 00:53:11 +0000 |
commit | 21273cba1880b1081b4152ee45b2f4ad6768e639 (patch) | |
tree | 325c93f75c67456b6729e581b9b69cc0bc52df48 /src/consensus | |
parent | bdee53dd2cb760e9acd601e251ba59c42c98c02f (diff) | |
download | gnunet-21273cba1880b1081b4152ee45b2f4ad6768e639.tar.gz gnunet-21273cba1880b1081b4152ee45b2f4ad6768e639.zip |
- gnunet-consensus now profiling tool
- work on service implementation, not working yet
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/Makefile.am | 5 | ||||
-rw-r--r-- | src/consensus/consensus.h | 4 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 50 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 71 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-start-peers.c | 3 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 360 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 1264 | ||||
-rw-r--r-- | src/consensus/ibf.c | 87 | ||||
-rw-r--r-- | src/consensus/ibf.h | 51 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 2 | ||||
-rw-r--r-- | src/consensus/test_consensus_api.c | 53 |
11 files changed, 1383 insertions, 567 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index 1beaa0c62..f5a5c5cdc 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am | |||
@@ -31,6 +31,7 @@ gnunet_consensus_SOURCES = \ | |||
31 | gnunet_consensus_LDADD = \ | 31 | gnunet_consensus_LDADD = \ |
32 | $(top_builddir)/src/util/libgnunetutil.la \ | 32 | $(top_builddir)/src/util/libgnunetutil.la \ |
33 | $(top_builddir)/src/consensus/libgnunetconsensus.la \ | 33 | $(top_builddir)/src/consensus/libgnunetconsensus.la \ |
34 | $(top_builddir)/src/testbed/libgnunettestbed.la \ | ||
34 | $(GN_LIBINTL) | 35 | $(GN_LIBINTL) |
35 | gnunet_consensus_DEPENDENCIES = \ | 36 | gnunet_consensus_DEPENDENCIES = \ |
36 | libgnunetconsensus.la | 37 | libgnunetconsensus.la |
@@ -53,10 +54,12 @@ gnunet_consensus_ibf_LDADD = \ | |||
53 | $(GN_LIBINTL) | 54 | $(GN_LIBINTL) |
54 | 55 | ||
55 | gnunet_service_consensus_SOURCES = \ | 56 | gnunet_service_consensus_SOURCES = \ |
56 | gnunet-service-consensus.c | 57 | gnunet-service-consensus.c \ |
58 | ibf.c | ||
57 | gnunet_service_consensus_LDADD = \ | 59 | gnunet_service_consensus_LDADD = \ |
58 | $(top_builddir)/src/util/libgnunetutil.la \ | 60 | $(top_builddir)/src/util/libgnunetutil.la \ |
59 | $(top_builddir)/src/core/libgnunetcore.la \ | 61 | $(top_builddir)/src/core/libgnunetcore.la \ |
62 | $(top_builddir)/src/stream/libgnunetstream.la \ | ||
60 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 63 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
61 | $(GN_LIBINTL) | 64 | $(GN_LIBINTL) |
62 | 65 | ||
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h index 75b90b0f9..8436364b6 100644 --- a/src/consensus/consensus.h +++ b/src/consensus/consensus.h | |||
@@ -71,6 +71,10 @@ struct GNUNET_CONSENSUS_ConcludeDoneMessage | |||
71 | */ | 71 | */ |
72 | struct GNUNET_MessageHeader header; | 72 | struct GNUNET_MessageHeader header; |
73 | 73 | ||
74 | uint32_t group_id; | ||
75 | |||
76 | uint32_t num_elements; | ||
77 | |||
74 | uint16_t num_peers; | 78 | uint16_t num_peers; |
75 | 79 | ||
76 | /** PeerIdentity[num_peers] */ | 80 | /** PeerIdentity[num_peers] */ |
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 25c76b358..5c0494254 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -54,11 +54,6 @@ struct QueuedMessage | |||
54 | struct GNUNET_MessageHeader *msg; | 54 | struct GNUNET_MessageHeader *msg; |
55 | 55 | ||
56 | /** | 56 | /** |
57 | * Size of the message in msg. | ||
58 | */ | ||
59 | size_t size; | ||
60 | |||
61 | /** | ||
62 | * Will be called after transmit, if not NULL | 57 | * Will be called after transmit, if not NULL |
63 | */ | 58 | */ |
64 | GNUNET_CONSENSUS_InsertDoneCallback idc; | 59 | GNUNET_CONSENSUS_InsertDoneCallback idc; |
@@ -154,7 +149,7 @@ struct GNUNET_CONSENSUS_Handle | |||
154 | * @param consensus consensus handle | 149 | * @param consensus consensus handle |
155 | */ | 150 | */ |
156 | static void | 151 | static void |
157 | schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus); | 152 | send_next (struct GNUNET_CONSENSUS_Handle *consensus); |
158 | 153 | ||
159 | 154 | ||
160 | /** | 155 | /** |
@@ -168,16 +163,17 @@ schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus); | |||
168 | * @param buf where the callee should write the message | 163 | * @param buf where the callee should write the message |
169 | * @return number of bytes written to buf | 164 | * @return number of bytes written to buf |
170 | */ | 165 | */ |
171 | static size_t transmit_queued (void *cls, size_t size, | 166 | static size_t |
172 | void *buf) | 167 | transmit_queued (void *cls, size_t size, |
168 | void *buf) | ||
173 | { | 169 | { |
174 | struct GNUNET_CONSENSUS_Handle *consensus; | 170 | struct GNUNET_CONSENSUS_Handle *consensus; |
175 | struct QueuedMessage *qmsg; | 171 | struct QueuedMessage *qmsg; |
176 | size_t ret_size; | 172 | size_t msg_size; |
177 | |||
178 | printf("transmitting queued\n"); | ||
179 | 173 | ||
180 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | 174 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; |
175 | consensus->th = NULL; | ||
176 | |||
181 | qmsg = consensus->messages_head; | 177 | qmsg = consensus->messages_head; |
182 | GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); | 178 | GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); |
183 | GNUNET_assert (qmsg); | 179 | GNUNET_assert (qmsg); |
@@ -188,10 +184,14 @@ static size_t transmit_queued (void *cls, size_t size, | |||
188 | { | 184 | { |
189 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); | 185 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); |
190 | } | 186 | } |
187 | return 0; | ||
191 | } | 188 | } |
192 | 189 | ||
193 | memcpy (buf, qmsg->msg, qmsg->size); | 190 | msg_size = ntohs (qmsg->msg->size); |
194 | ret_size = qmsg->size; | 191 | |
192 | GNUNET_assert (size >= msg_size); | ||
193 | |||
194 | memcpy (buf, qmsg->msg, msg_size); | ||
195 | if (NULL != qmsg->idc) | 195 | if (NULL != qmsg->idc) |
196 | { | 196 | { |
197 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); | 197 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); |
@@ -199,9 +199,9 @@ static size_t transmit_queued (void *cls, size_t size, | |||
199 | GNUNET_free (qmsg->msg); | 199 | GNUNET_free (qmsg->msg); |
200 | GNUNET_free (qmsg); | 200 | GNUNET_free (qmsg); |
201 | 201 | ||
202 | schedule_transmit (consensus); | 202 | send_next (consensus); |
203 | 203 | ||
204 | return ret_size; | 204 | return msg_size; |
205 | } | 205 | } |
206 | 206 | ||
207 | 207 | ||
@@ -211,7 +211,7 @@ static size_t transmit_queued (void *cls, size_t size, | |||
211 | * @param consensus consensus handle | 211 | * @param consensus consensus handle |
212 | */ | 212 | */ |
213 | static void | 213 | static void |
214 | schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus) | 214 | send_next (struct GNUNET_CONSENSUS_Handle *consensus) |
215 | { | 215 | { |
216 | if (NULL != consensus->th) | 216 | if (NULL != consensus->th) |
217 | return; | 217 | return; |
@@ -219,9 +219,10 @@ schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus) | |||
219 | if (NULL != consensus->messages_head) | 219 | if (NULL != consensus->messages_head) |
220 | { | 220 | { |
221 | LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); | 221 | LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); |
222 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, consensus->messages_head->size, | 222 | consensus->th = |
223 | GNUNET_TIME_UNIT_FOREVER_REL, | 223 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), |
224 | GNUNET_NO, &transmit_queued, consensus); | 224 | GNUNET_TIME_UNIT_FOREVER_REL, |
225 | GNUNET_NO, &transmit_queued, consensus); | ||
225 | } | 226 | } |
226 | } | 227 | } |
227 | 228 | ||
@@ -270,8 +271,7 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, | |||
270 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) | 271 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) |
271 | { | 272 | { |
272 | GNUNET_assert (NULL != consensus->conclude_cb); | 273 | GNUNET_assert (NULL != consensus->conclude_cb); |
273 | consensus->conclude_cb (consensus->conclude_cls, | 274 | consensus->conclude_cb (consensus->conclude_cls, NULL); |
274 | 0, NULL); | ||
275 | consensus->conclude_cb = NULL; | 275 | consensus->conclude_cb = NULL; |
276 | } | 276 | } |
277 | 277 | ||
@@ -356,7 +356,7 @@ transmit_join (void *cls, size_t size, void *buf) | |||
356 | consensus->peers, | 356 | consensus->peers, |
357 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 357 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); |
358 | 358 | ||
359 | schedule_transmit (consensus); | 359 | send_next (consensus); |
360 | 360 | ||
361 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | 361 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, |
362 | GNUNET_TIME_UNIT_FOREVER_REL); | 362 | GNUNET_TIME_UNIT_FOREVER_REL); |
@@ -454,13 +454,12 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
454 | 454 | ||
455 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | 455 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); |
456 | qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; | 456 | qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; |
457 | qmsg->size = element_msg_size; | ||
458 | qmsg->idc = idc; | 457 | qmsg->idc = idc; |
459 | qmsg->idc_cls = idc_cls; | 458 | qmsg->idc_cls = idc_cls; |
460 | 459 | ||
461 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); | 460 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); |
462 | 461 | ||
463 | schedule_transmit (consensus); | 462 | send_next (consensus); |
464 | } | 463 | } |
465 | 464 | ||
466 | 465 | ||
@@ -500,11 +499,10 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | |||
500 | 499 | ||
501 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | 500 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); |
502 | qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; | 501 | qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; |
503 | qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); | ||
504 | 502 | ||
505 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); | 503 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); |
506 | 504 | ||
507 | schedule_transmit (consensus); | 505 | send_next (consensus); |
508 | } | 506 | } |
509 | 507 | ||
510 | 508 | ||
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h new file mode 100644 index 000000000..105708ee9 --- /dev/null +++ b/src/consensus/consensus_protocol.h | |||
@@ -0,0 +1,71 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | |||
22 | /** | ||
23 | * @file consensus/consensus_protocol.h | ||
24 | * @brief p2p message definitions for consensus | ||
25 | * @author Florian Dold | ||
26 | */ | ||
27 | |||
28 | #ifndef GNUNET_CONSENSUS_PROTOCOL_H | ||
29 | #define GNUNET_CONSENSUS_PROTOCOL_H | ||
30 | |||
31 | #include "platform.h" | ||
32 | #include "gnunet_common.h" | ||
33 | #include "gnunet_protocols.h" | ||
34 | |||
35 | |||
36 | GNUNET_NETWORK_STRUCT_BEGIN | ||
37 | |||
38 | struct StrataMessage | ||
39 | { | ||
40 | struct GNUNET_MessageHeader header; | ||
41 | /** | ||
42 | * Number of strata in this estimator. | ||
43 | */ | ||
44 | uint16_t num_strata; | ||
45 | /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */ | ||
46 | /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */ | ||
47 | /* uint8_t count_buckets[ibf_size*num_strata] */ | ||
48 | }; | ||
49 | |||
50 | struct DifferenceDigest | ||
51 | { | ||
52 | |||
53 | struct GNUNET_MessageHeader header; | ||
54 | }; | ||
55 | |||
56 | struct Element | ||
57 | { | ||
58 | struct GNUNET_MessageHeader header; | ||
59 | }; | ||
60 | |||
61 | struct ConsensusHello | ||
62 | { | ||
63 | struct GNUNET_MessageHeader header; | ||
64 | struct GNUNET_HashCode global_id; | ||
65 | uint8_t round; | ||
66 | }; | ||
67 | |||
68 | |||
69 | GNUNET_NETWORK_STRUCT_END | ||
70 | |||
71 | #endif | ||
diff --git a/src/consensus/gnunet-consensus-start-peers.c b/src/consensus/gnunet-consensus-start-peers.c index 8b516393f..fb7f047ae 100644 --- a/src/consensus/gnunet-consensus-start-peers.c +++ b/src/consensus/gnunet-consensus-start-peers.c | |||
@@ -147,9 +147,6 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
147 | NULL, | 147 | NULL, |
148 | test_master, | 148 | test_master, |
149 | NULL); | 149 | NULL); |
150 | |||
151 | |||
152 | printf("hello there!\n"); | ||
153 | } | 150 | } |
154 | 151 | ||
155 | 152 | ||
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index cd267f5ec..c8a5593f1 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -20,210 +20,283 @@ | |||
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/gnunet-consensus.c | 22 | * @file consensus/gnunet-consensus.c |
23 | * @brief | 23 | * @brief profiling tool for gnunet-consensus |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
26 | #include "platform.h" | 26 | #include "platform.h" |
27 | #include "gnunet_common.h" | ||
27 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
28 | #include "gnunet_consensus_service.h" | 29 | #include "gnunet_consensus_service.h" |
30 | #include "gnunet_testbed_service.h" | ||
29 | 31 | ||
32 | static unsigned int num_peers = 2; | ||
30 | 33 | ||
34 | static unsigned int replication = 1; | ||
31 | 35 | ||
32 | /** | 36 | static unsigned int num_values = 5; |
33 | * Handle to the consensus service | ||
34 | */ | ||
35 | static struct GNUNET_CONSENSUS_Handle *consensus; | ||
36 | /** | ||
37 | * Session id | ||
38 | */ | ||
39 | static char *session_id_str; | ||
40 | 37 | ||
41 | /** | 38 | static struct GNUNET_TIME_Relative conclude_timeout; |
42 | * File handle to STDIN | ||
43 | */ | ||
44 | static struct GNUNET_DISK_FileHandle *stdin_fh; | ||
45 | 39 | ||
46 | /** | 40 | static struct GNUNET_CONSENSUS_Handle **consensus_handles; |
47 | * Task for reading from stdin | 41 | |
48 | */ | 42 | static unsigned int num_connected_handles; |
49 | static GNUNET_SCHEDULER_TaskIdentifier stdin_tid = GNUNET_SCHEDULER_NO_TASK; | 43 | |
44 | static struct GNUNET_TESTBED_Peer **peers; | ||
45 | |||
46 | static struct GNUNET_PeerIdentity *peer_ids; | ||
50 | 47 | ||
48 | static unsigned int num_retrieved_peer_ids; | ||
51 | 49 | ||
50 | static struct GNUNET_HashCode session_id; | ||
51 | |||
52 | |||
53 | /** | ||
54 | * Signature of the event handler function called by the | ||
55 | * respective event controller. | ||
56 | * | ||
57 | * @param cls closure | ||
58 | * @param event information about the event | ||
59 | */ | ||
52 | static void | 60 | static void |
53 | stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 61 | controller_cb(void *cls, |
62 | const struct GNUNET_TESTBED_EventInformation *event) | ||
63 | { | ||
64 | GNUNET_assert (0); | ||
65 | } | ||
54 | 66 | ||
55 | 67 | ||
56 | /** | 68 | /** |
57 | * Called when a conclusion was successful. | 69 | * Called when a conclusion was successful. |
58 | * | 70 | * |
59 | * @param cls | 71 | * @param cls |
60 | * @param num_peers_in_consensus | 72 | * @param group |
61 | * @param peers_in_consensus | 73 | * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not |
62 | */ | 74 | */ |
75 | static int | ||
76 | conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group) | ||
77 | { | ||
78 | return GNUNET_NO; | ||
79 | } | ||
80 | |||
81 | |||
82 | |||
63 | static void | 83 | static void |
64 | conclude_cb (void *cls, | 84 | generate_indices (int *indices) |
65 | unsigned int consensus_group_count, | ||
66 | const struct GNUNET_CONSENSUS_Group *groups) | ||
67 | { | 85 | { |
68 | printf("reached conclusion\n"); | 86 | int j; |
69 | GNUNET_SCHEDULER_shutdown (); | 87 | j = 0; |
88 | while (j < replication) | ||
89 | { | ||
90 | int n; | ||
91 | int k; | ||
92 | int repeat; | ||
93 | n = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, num_peers); | ||
94 | repeat = GNUNET_NO; | ||
95 | for (k = 0; k < j; k++) | ||
96 | if (indices[k] == n) | ||
97 | { | ||
98 | repeat = GNUNET_YES; | ||
99 | break; | ||
100 | } | ||
101 | if (GNUNET_NO == repeat) | ||
102 | indices[j++] = n; | ||
103 | } | ||
70 | } | 104 | } |
71 | 105 | ||
72 | 106 | ||
73 | static void | 107 | static void |
74 | insert_done_cb (void *cls, | 108 | do_consensus () |
75 | int success) | ||
76 | { | 109 | { |
77 | struct GNUNET_CONSENSUS_Element *element = cls; | 110 | int unique_indices[replication]; |
111 | int i; | ||
78 | 112 | ||
79 | GNUNET_free (element); | 113 | for (i = 0; i < num_values; i++) |
80 | if (GNUNET_YES != success) | ||
81 | { | 114 | { |
82 | printf ("insert failed\n"); | 115 | int j; |
83 | GNUNET_SCHEDULER_shutdown (); | 116 | struct GNUNET_HashCode *val; |
84 | return; | 117 | struct GNUNET_CONSENSUS_Element *element; |
118 | generate_indices(unique_indices); | ||
119 | |||
120 | val = GNUNET_malloc (sizeof *val); | ||
121 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, val); | ||
122 | |||
123 | element = GNUNET_malloc (sizeof *element); | ||
124 | element->data = val; | ||
125 | element->size = sizeof *val; | ||
126 | |||
127 | for (j = 0; j < replication; j++) | ||
128 | { | ||
129 | int cid; | ||
130 | cid = unique_indices[j]; | ||
131 | GNUNET_CONSENSUS_insert (consensus_handles[cid], element, NULL, NULL); | ||
132 | } | ||
85 | } | 133 | } |
86 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == stdin_tid); | 134 | |
87 | stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh, | 135 | for (i = 0; i < num_peers; i++) |
88 | &stdin_cb, NULL); | 136 | GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, 0, conclude_cb, consensus_handles[i]); |
89 | } | 137 | } |
90 | 138 | ||
91 | 139 | ||
92 | /** | 140 | /** |
93 | * Called whenever we can read stdin non-blocking | 141 | * Callback to be called when a service connect operation is completed |
94 | * | 142 | * |
95 | * @param cls unused | 143 | * @param cls the callback closure from functions generating an operation |
96 | * @param tc scheduler context | 144 | * @param op the operation that has been finished |
145 | * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter() | ||
146 | * @param emsg error message in case the operation has failed; will be NULL if | ||
147 | * operation has executed successfully. | ||
97 | */ | 148 | */ |
98 | static void | 149 | static void |
99 | stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 150 | connect_complete (void *cls, |
151 | struct GNUNET_TESTBED_Operation *op, | ||
152 | void *ca_result, | ||
153 | const char *emsg) | ||
100 | { | 154 | { |
101 | char buf[1024]; | 155 | struct GNUNET_CONSENSUS_Handle **chp; |
102 | char *ret; | 156 | |
103 | struct GNUNET_CONSENSUS_Element *element; | 157 | if (NULL != emsg) |
104 | |||
105 | stdin_tid = GNUNET_SCHEDULER_NO_TASK; | ||
106 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
107 | return; /* we're done here */ | ||
108 | ret = fgets (buf, 1024, stdin); | ||
109 | if (NULL == ret) | ||
110 | { | 158 | { |
111 | if (feof (stdin)) | 159 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "testbed connect emsg: %s\n", emsg); |
112 | { | 160 | GNUNET_assert (0); |
113 | printf ("concluding ...\n"); | ||
114 | GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 0, conclude_cb, NULL); | ||
115 | } | ||
116 | return; | ||
117 | } | 161 | } |
118 | 162 | ||
119 | printf("read: %s", buf); | 163 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "connect complete\n"); |
164 | |||
165 | chp = (struct GNUNET_CONSENSUS_Handle **) cls; | ||
166 | *chp = (struct GNUNET_CONSENSUS_Handle *) ca_result; | ||
167 | num_connected_handles++; | ||
120 | 168 | ||
121 | element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + strlen(buf) + 1); | 169 | if (num_connected_handles == num_peers) |
122 | element->type = 0; | 170 | { |
123 | element->size = strlen(buf) + 1; | 171 | do_consensus (); |
124 | element->data = &element[1]; | 172 | } |
125 | strcpy ((char *) &element[1], buf); | 173 | } |
126 | GNUNET_CONSENSUS_insert (consensus, element, &insert_done_cb, element); | 174 | |
175 | |||
176 | static int | ||
177 | new_element_cb (void *cls, | ||
178 | struct GNUNET_CONSENSUS_Element *element) | ||
179 | { | ||
180 | return GNUNET_YES; | ||
127 | } | 181 | } |
128 | 182 | ||
129 | 183 | ||
130 | /** | 184 | /** |
131 | * Called when a new element was received from another peer, or an error occured. | 185 | * Adapter function called to establish a connection to |
132 | * | 186 | * a service. |
133 | * May deliver duplicate values. | ||
134 | * | ||
135 | * Elements given to a consensus operation by the local peer are NOT given | ||
136 | * to this callback. | ||
137 | * | 187 | * |
138 | * @param cls closure | 188 | * @param cls closure |
139 | * @param element new element, NULL on error | 189 | * @param cfg configuration of the peer to connect to; will be available until |
140 | * @return GNUNET_OK if the valid is well-formed and should be added to the consensus, | 190 | * GNUNET_TESTBED_operation_done() is called on the operation returned |
141 | * GNUNET_SYSERR if the element should be ignored and not be propagated | 191 | * from GNUNET_TESTBED_service_connect() |
192 | * @return service handle to return in 'op_result', NULL on error | ||
142 | */ | 193 | */ |
143 | static int | 194 | static void * |
144 | cb (void *cls, | 195 | connect_adapter (void *cls, |
145 | struct GNUNET_CONSENSUS_Element *element) | 196 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
146 | { | 197 | { |
147 | if (NULL == element) | 198 | struct GNUNET_CONSENSUS_Handle *consensus; |
148 | { | 199 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "connect adapter, %d peers\n", num_peers); |
149 | printf("error receiving from consensus\n"); | 200 | consensus = GNUNET_CONSENSUS_create (cfg, num_peers, peer_ids, &session_id, new_element_cb, NULL); |
150 | GNUNET_SCHEDULER_shutdown (); | 201 | GNUNET_assert (NULL != consensus); |
151 | return GNUNET_NO; | 202 | return consensus; |
152 | } | ||
153 | printf("got element\n"); | ||
154 | return GNUNET_YES; | ||
155 | } | 203 | } |
156 | 204 | ||
157 | 205 | ||
158 | /** | 206 | /** |
159 | * Function run on shutdown to clean up. | 207 | * Adapter function called to destroy a connection to |
208 | * a service. | ||
160 | * | 209 | * |
161 | * @param cls the statistics handle | 210 | * @param cls closure |
162 | * @param tc scheduler context | 211 | * @param op_result service handle returned from the connect adapter |
163 | */ | 212 | */ |
164 | static void | 213 | static void |
165 | shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 214 | disconnect_adapter(void *cls, void *op_result) |
166 | { | 215 | { |
167 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n"); | 216 | /* FIXME: what to do here? */ |
168 | if (NULL != consensus) | ||
169 | { | ||
170 | GNUNET_CONSENSUS_destroy (consensus); | ||
171 | consensus = NULL; | ||
172 | } | ||
173 | } | 217 | } |
174 | 218 | ||
175 | 219 | ||
220 | /** | ||
221 | * Callback to be called when the requested peer information is available | ||
222 | * | ||
223 | * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information() | ||
224 | * @param op the operation this callback corresponds to | ||
225 | * @param pinfo the result; will be NULL if the operation has failed | ||
226 | * @param emsg error message if the operation has failed; will be NULL if the | ||
227 | * operation is successfull | ||
228 | */ | ||
176 | static void | 229 | static void |
177 | run (void *cls, char *const *args, const char *cfgfile, | 230 | peer_info_cb (void *cb_cls, |
178 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 231 | struct GNUNET_TESTBED_Operation *op, |
232 | const struct GNUNET_TESTBED_PeerInformation *pinfo, | ||
233 | const char *emsg) | ||
179 | { | 234 | { |
180 | struct GNUNET_HashCode sid; | 235 | struct GNUNET_PeerIdentity *p; |
181 | struct GNUNET_PeerIdentity *pids; | ||
182 | int count; | ||
183 | int i; | 236 | int i; |
184 | 237 | ||
185 | if (NULL == session_id_str) | 238 | GNUNET_assert (NULL == emsg); |
186 | { | ||
187 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given (missing -s/--session-id)\n"); | ||
188 | return; | ||
189 | } | ||
190 | 239 | ||
191 | GNUNET_CRYPTO_hash (session_id_str, strlen (session_id_str), &sid); | 240 | p = (struct GNUNET_PeerIdentity *) cb_cls; |
192 | 241 | ||
193 | for (count = 0; NULL != args[count]; count++); | 242 | if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY) |
194 | 243 | { | |
195 | if (0 != count) | 244 | *p = *pinfo->result.id; |
196 | { | 245 | num_retrieved_peer_ids++; |
197 | pids = GNUNET_malloc (count * sizeof (struct GNUNET_PeerIdentity)); | 246 | if (num_retrieved_peer_ids == num_peers) |
247 | for (i = 0; i < num_peers; i++) | ||
248 | GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", connect_complete, &consensus_handles[i], | ||
249 | connect_adapter, disconnect_adapter, NULL); | ||
198 | } | 250 | } |
199 | else | 251 | else |
200 | { | 252 | { |
201 | pids = NULL; | 253 | GNUNET_assert (0); |
202 | } | 254 | } |
255 | } | ||
203 | 256 | ||
204 | for (i = 0; i < count; i++) | ||
205 | { | ||
206 | int ret; | ||
207 | ret = GNUNET_CRYPTO_hash_from_string (args[i], &pids[i].hashPubKey); | ||
208 | if (GNUNET_OK != ret) | ||
209 | { | ||
210 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "peer identity '%s' is malformed\n", args[i]); | ||
211 | return; | ||
212 | } | ||
213 | } | ||
214 | 257 | ||
215 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, | 258 | static void |
216 | &shutdown_task, NULL); | 259 | test_master (void *cls, |
217 | 260 | unsigned int num_peers, | |
218 | consensus = | 261 | struct GNUNET_TESTBED_Peer **started_peers) |
219 | GNUNET_CONSENSUS_create (cfg, | 262 | { |
220 | count, pids, | 263 | int i; |
221 | &sid, | 264 | |
222 | &cb, NULL); | 265 | |
223 | 266 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); | |
224 | stdin_fh = GNUNET_DISK_get_handle_from_native (stdin); | 267 | |
225 | stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh, | 268 | peers = started_peers; |
226 | &stdin_cb, NULL); | 269 | |
270 | peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
271 | |||
272 | consensus_handles = GNUNET_malloc (num_peers * sizeof (struct ConsensusHandle *)); | ||
273 | |||
274 | for (i = 0; i < num_peers; i++) | ||
275 | GNUNET_TESTBED_peer_get_information (peers[i], | ||
276 | GNUNET_TESTBED_PIT_IDENTITY, | ||
277 | peer_info_cb, | ||
278 | &peer_ids[i]); | ||
279 | } | ||
280 | |||
281 | static void | ||
282 | run (void *cls, char *const *args, const char *cfgfile, | ||
283 | const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
284 | { | ||
285 | static char *session_str = "gnunet-consensus/test"; | ||
286 | |||
287 | |||
288 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "running gnunet-consensus\n"); | ||
289 | |||
290 | GNUNET_CRYPTO_hash (session_str, strlen(session_str), &session_id); | ||
291 | |||
292 | (void) GNUNET_TESTBED_test_run ("gnunet-consensus", | ||
293 | cfgfile, | ||
294 | num_peers, | ||
295 | 0, | ||
296 | controller_cb, | ||
297 | NULL, | ||
298 | test_master, | ||
299 | NULL); | ||
227 | } | 300 | } |
228 | 301 | ||
229 | 302 | ||
@@ -231,13 +304,24 @@ int | |||
231 | main (int argc, char **argv) | 304 | main (int argc, char **argv) |
232 | { | 305 | { |
233 | static const struct GNUNET_GETOPT_CommandLineOption options[] = { | 306 | static const struct GNUNET_GETOPT_CommandLineOption options[] = { |
234 | { 's', "session-id", "ID", | 307 | { 'n', "num-peers", NULL, |
235 | gettext_noop ("session identifier"), | 308 | gettext_noop ("number of peers in consensus"), |
236 | GNUNET_YES, &GNUNET_GETOPT_set_string, &session_id_str }, | 309 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers }, |
237 | GNUNET_GETOPT_OPTION_END | 310 | { 'k', "value-replication", NULL, |
238 | }; | 311 | gettext_noop ("how many peers receive one value?"), |
239 | GNUNET_PROGRAM_run (argc, argv, "gnunet-consensus", | 312 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &replication }, |
313 | { 'x', "num-values", NULL, | ||
314 | gettext_noop ("number of values"), | ||
315 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_values }, | ||
316 | { 't', "timeout", NULL, | ||
317 | gettext_noop ("consensus timeout"), | ||
318 | GNUNET_YES, &GNUNET_GETOPT_set_relative_time, &conclude_timeout }, | ||
319 | GNUNET_GETOPT_OPTION_END | ||
320 | }; | ||
321 | conclude_timeout = GNUNET_TIME_UNIT_SECONDS; | ||
322 | GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus", | ||
240 | "help", | 323 | "help", |
241 | options, &run, NULL); | 324 | options, &run, NULL, GNUNET_YES); |
242 | return 0; | 325 | return 0; |
243 | } | 326 | } |
327 | |||
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 1b394db19..ad0266954 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -32,15 +32,45 @@ | |||
32 | #include "gnunet_util_lib.h" | 32 | #include "gnunet_util_lib.h" |
33 | #include "gnunet_consensus_service.h" | 33 | #include "gnunet_consensus_service.h" |
34 | #include "gnunet_core_service.h" | 34 | #include "gnunet_core_service.h" |
35 | #include "gnunet_mesh_service.h" | 35 | #include "gnunet_stream_lib.h" |
36 | #include "consensus_protocol.h" | ||
37 | #include "ibf.h" | ||
36 | #include "consensus.h" | 38 | #include "consensus.h" |
37 | 39 | ||
38 | 40 | ||
41 | /** | ||
42 | * Number of IBFs in a strata estimator. | ||
43 | */ | ||
44 | #define STRATA_COUNT 32 | ||
45 | /** | ||
46 | * Number of buckets per IBF. | ||
47 | */ | ||
48 | #define STRATA_IBF_BUCKETS 80 | ||
49 | /** | ||
50 | * hash num parameter of the IBF | ||
51 | */ | ||
52 | #define STRATA_HASH_NUM 3 | ||
53 | /** | ||
54 | * Number of strata that can be transmitted in one message. | ||
55 | */ | ||
56 | #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS)) | ||
57 | |||
58 | |||
59 | |||
60 | /* forward declarations */ | ||
61 | |||
39 | struct ConsensusSession; | 62 | struct ConsensusSession; |
63 | struct IncomingSocket; | ||
40 | 64 | ||
41 | static void | 65 | static void |
42 | send_next (struct ConsensusSession *session); | 66 | send_next (struct ConsensusSession *session); |
43 | 67 | ||
68 | static void | ||
69 | write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size); | ||
70 | |||
71 | static int | ||
72 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); | ||
73 | |||
44 | 74 | ||
45 | /** | 75 | /** |
46 | * An element that is waiting to be transmitted to a client. | 76 | * An element that is waiting to be transmitted to a client. |
@@ -63,22 +93,75 @@ struct PendingElement | |||
63 | struct GNUNET_CONSENSUS_Element *element; | 93 | struct GNUNET_CONSENSUS_Element *element; |
64 | }; | 94 | }; |
65 | 95 | ||
66 | 96 | struct ConsensusPeerInformation | |
67 | /* | ||
68 | * A peer that is also in a consensus session. | ||
69 | * Note that 'this' peer is not in the list. | ||
70 | */ | ||
71 | struct ConsensusPeer | ||
72 | { | 97 | { |
73 | struct GNUNET_PeerIdentity *peer_id; | 98 | struct GNUNET_STREAM_Socket *socket; |
99 | |||
100 | /** | ||
101 | * Is socket's connection established, i.e. can we write to it? | ||
102 | * Only relevent on outgoing cpi. | ||
103 | */ | ||
104 | int is_connected; | ||
105 | |||
106 | /** | ||
107 | * Type of the peer in the all-to-all rounds, | ||
108 | * GNUNET_YES if we initiate reconciliation. | ||
109 | */ | ||
110 | int is_outgoing; | ||
111 | |||
112 | /** | ||
113 | * Did we receive/send a consensus hello? | ||
114 | */ | ||
115 | int hello; | ||
116 | |||
117 | /** | ||
118 | * Handle for currently active read | ||
119 | */ | ||
120 | struct GNUNET_STREAM_ReadHandle *rh; | ||
121 | |||
122 | /** | ||
123 | * Handle for currently active read | ||
124 | */ | ||
125 | struct GNUNET_STREAM_WriteHandle *wh; | ||
126 | |||
127 | /** | ||
128 | * How many of the strate in the ibf were | ||
129 | * sent or received in this round? | ||
130 | */ | ||
131 | int strata_counter; | ||
132 | |||
133 | struct InvertibleBloomFilter *my_ibf; | ||
134 | |||
135 | int my_ibf_bucket_counter; | ||
136 | |||
137 | struct InvertibleBloomFilter *peer_ibf; | ||
138 | |||
139 | int peer_ibf_bucket_counter; | ||
74 | 140 | ||
75 | /** | 141 | /** |
76 | * Incoming tunnel from the peer. | 142 | * Strata estimator of the peer, NULL if our peer |
143 | * initiated the reconciliation. | ||
77 | */ | 144 | */ |
78 | struct GNUNET_MESH_Tunnel *incoming_tunnel; | 145 | struct InvertibleBloomFilter **strata; |
79 | 146 | ||
80 | struct InvertibleBloomFilter *last_ibf; | 147 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; |
148 | |||
149 | struct ConsensusSession *session; | ||
150 | }; | ||
151 | |||
152 | struct QueuedMessage | ||
153 | { | ||
154 | struct GNUNET_MessageHeader *msg; | ||
155 | |||
156 | /** | ||
157 | * Queued messages are stored in a doubly linked list. | ||
158 | */ | ||
159 | struct QueuedMessage *next; | ||
81 | 160 | ||
161 | /** | ||
162 | * Queued messages are stored in a doubly linked list. | ||
163 | */ | ||
164 | struct QueuedMessage *prev; | ||
82 | }; | 165 | }; |
83 | 166 | ||
84 | 167 | ||
@@ -98,15 +181,17 @@ struct ConsensusSession | |||
98 | struct ConsensusSession *prev; | 181 | struct ConsensusSession *prev; |
99 | 182 | ||
100 | /** | 183 | /** |
101 | * Local consensus identification, chosen by clients. | 184 | * Join message. Used to initialize the session later, |
185 | * if the identity of the local peer is not yet known. | ||
186 | * NULL if the session has been fully initialized. | ||
102 | */ | 187 | */ |
103 | struct GNUNET_HashCode *local_id; | 188 | struct GNUNET_CONSENSUS_JoinMessage *join_msg; |
104 | 189 | ||
105 | /** | 190 | /** |
106 | * Global consensus identification, computed | 191 | * Global consensus identification, computed |
107 | * from the local id and participating authorities. | 192 | * from the local id and participating authorities. |
108 | */ | 193 | */ |
109 | struct GNUNET_HashCode *global_id; | 194 | struct GNUNET_HashCode global_id; |
110 | 195 | ||
111 | /** | 196 | /** |
112 | * Local client in this consensus session. | 197 | * Local client in this consensus session. |
@@ -140,6 +225,10 @@ struct ConsensusSession | |||
140 | */ | 225 | */ |
141 | struct PendingElement *approval_pending_tail; | 226 | struct PendingElement *approval_pending_tail; |
142 | 227 | ||
228 | struct QueuedMessage *client_messages_head; | ||
229 | |||
230 | struct QueuedMessage *client_messages_tail; | ||
231 | |||
143 | /** | 232 | /** |
144 | * Currently active transmit handle for sending to the client | 233 | * Currently active transmit handle for sending to the client |
145 | */ | 234 | */ |
@@ -152,11 +241,6 @@ struct ConsensusSession | |||
152 | int conclude_requested; | 241 | int conclude_requested; |
153 | 242 | ||
154 | /** | 243 | /** |
155 | * Client has been informed about the conclusion. | ||
156 | */ | ||
157 | int conclude_sent; | ||
158 | |||
159 | /** | ||
160 | * Minimum number of peers to form a consensus group | 244 | * Minimum number of peers to form a consensus group |
161 | */ | 245 | */ |
162 | int conclude_group_min; | 246 | int conclude_group_min; |
@@ -178,30 +262,74 @@ struct ConsensusSession | |||
178 | */ | 262 | */ |
179 | unsigned int num_peers; | 263 | unsigned int num_peers; |
180 | 264 | ||
181 | /** | 265 | struct ConsensusPeerInformation *info; |
182 | * Other peers in the consensus, array of ConsensusPeer | ||
183 | */ | ||
184 | struct ConsensusPeer *peers; | ||
185 | 266 | ||
186 | /** | 267 | /** |
187 | * Tunnel for broadcasting to all other authorities | 268 | * Sorted array of peer identities in this consensus session, |
269 | * includes the local peer. | ||
188 | */ | 270 | */ |
189 | struct GNUNET_MESH_Tunnel *broadcast_tunnel; | 271 | struct GNUNET_PeerIdentity *peers; |
190 | 272 | ||
191 | /** | 273 | /** |
192 | * Time limit for one round of pairwise exchange. | 274 | * Index of the local peer in the peers array |
193 | * FIXME: should not actually be a constant | ||
194 | */ | 275 | */ |
195 | struct GNUNET_TIME_Relative round_time; | 276 | int local_peer_idx; |
196 | 277 | ||
197 | /** | 278 | /** |
198 | * Task identifier for the round timeout task | 279 | * Task identifier for the round timeout task |
199 | */ | 280 | */ |
200 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; | 281 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; |
282 | |||
283 | struct InvertibleBloomFilter **strata; | ||
201 | }; | 284 | }; |
202 | 285 | ||
203 | 286 | ||
204 | /** | 287 | /** |
288 | * Sockets from other peers who want to communicate with us. | ||
289 | * It may not be known yet which consensus session they belong to. | ||
290 | */ | ||
291 | struct IncomingSocket | ||
292 | { | ||
293 | /** | ||
294 | * Incoming sockets are kept in a double linked list. | ||
295 | */ | ||
296 | struct IncomingSocket *next; | ||
297 | |||
298 | /** | ||
299 | * Incoming sockets are kept in a double linked list. | ||
300 | */ | ||
301 | struct IncomingSocket *prev; | ||
302 | |||
303 | /** | ||
304 | * The actual socket. | ||
305 | */ | ||
306 | struct GNUNET_STREAM_Socket *socket; | ||
307 | |||
308 | /** | ||
309 | * Handle for currently active read | ||
310 | */ | ||
311 | struct GNUNET_STREAM_ReadHandle *rh; | ||
312 | |||
313 | /** | ||
314 | * Peer that connected to us with the socket. | ||
315 | */ | ||
316 | struct GNUNET_PeerIdentity *peer; | ||
317 | |||
318 | /** | ||
319 | * Message stream tokenizer for this socket. | ||
320 | */ | ||
321 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
322 | |||
323 | /** | ||
324 | * Peer-in-session this socket belongs to, once known, otherwise NULL. | ||
325 | */ | ||
326 | struct ConsensusPeerInformation *cpi; | ||
327 | }; | ||
328 | |||
329 | static struct IncomingSocket *incoming_sockets_head; | ||
330 | static struct IncomingSocket *incoming_sockets_tail; | ||
331 | |||
332 | /** | ||
205 | * Linked list of sesstions this peer participates in. | 333 | * Linked list of sesstions this peer participates in. |
206 | */ | 334 | */ |
207 | static struct ConsensusSession *sessions_head; | 335 | static struct ConsensusSession *sessions_head; |
@@ -222,32 +350,349 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; | |||
222 | static struct GNUNET_SERVER_Handle *srv; | 350 | static struct GNUNET_SERVER_Handle *srv; |
223 | 351 | ||
224 | /** | 352 | /** |
225 | * Peer that runs this service | 353 | * Peer that runs this service. |
226 | */ | 354 | */ |
227 | static struct GNUNET_PeerIdentity *my_peer; | 355 | static struct GNUNET_PeerIdentity *my_peer; |
228 | 356 | ||
229 | /** | 357 | /** |
230 | * Handle to the mesh service. | 358 | * Handle to the core service. Only used during service startup, will be NULL after that. |
231 | */ | 359 | */ |
232 | static struct GNUNET_MESH_Handle *mesh; | 360 | static struct GNUNET_CORE_Handle *core; |
233 | 361 | ||
234 | /** | 362 | /** |
235 | * Handle to the core service. Only used during service startup, will be NULL after that. | 363 | * Listener for sockets from peers that want to reconcile with us. |
236 | */ | 364 | */ |
237 | static struct GNUNET_CORE_Handle *core; | 365 | static struct GNUNET_STREAM_ListenSocket *listener; |
238 | 366 | ||
367 | |||
368 | static int | ||
369 | estimate_difference (struct InvertibleBloomFilter** strata1, | ||
370 | struct InvertibleBloomFilter** strata2) | ||
371 | { | ||
372 | int i; | ||
373 | int count; | ||
374 | count = 0; | ||
375 | for (i = STRATA_COUNT - 1; i >= 0; i--) | ||
376 | { | ||
377 | struct InvertibleBloomFilter *diff; | ||
378 | int ibf_count; | ||
379 | int more; | ||
380 | ibf_count = 0; | ||
381 | diff = ibf_dup (strata1[i]); | ||
382 | ibf_subtract (diff, strata2[i]); | ||
383 | for (;;) | ||
384 | { | ||
385 | more = ibf_decode (diff, NULL, NULL); | ||
386 | if (GNUNET_NO == more) | ||
387 | { | ||
388 | count += ibf_count; | ||
389 | break; | ||
390 | } | ||
391 | if (GNUNET_SYSERR == more) | ||
392 | { | ||
393 | return count * (1 << (i + 1)); | ||
394 | } | ||
395 | ibf_count++; | ||
396 | } | ||
397 | ibf_destroy (diff); | ||
398 | } | ||
399 | return count; | ||
400 | } | ||
401 | |||
402 | |||
403 | /** | ||
404 | * Functions of this signature are called whenever data is available from the | ||
405 | * stream. | ||
406 | * | ||
407 | * @param cls the closure from GNUNET_STREAM_read | ||
408 | * @param status the status of the stream at the time this function is called | ||
409 | * @param data traffic from the other side | ||
410 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
411 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
412 | * given to the next time the read processor is called). | ||
413 | */ | ||
414 | static size_t | ||
415 | stream_data_processor (void *cls, | ||
416 | enum GNUNET_STREAM_Status status, | ||
417 | const void *data, | ||
418 | size_t size) | ||
419 | { | ||
420 | struct IncomingSocket *incoming; | ||
421 | int ret; | ||
422 | |||
423 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
424 | |||
425 | incoming = (struct IncomingSocket *) cls; | ||
426 | |||
427 | ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_NO); | ||
428 | if (GNUNET_SYSERR == ret) | ||
429 | { | ||
430 | /* FIXME: handle this correctly */ | ||
431 | GNUNET_assert (0); | ||
432 | } | ||
433 | |||
434 | /* read again */ | ||
435 | incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
436 | &stream_data_processor, incoming); | ||
437 | |||
438 | /* we always read all data */ | ||
439 | return size; | ||
440 | } | ||
441 | |||
442 | static int | ||
443 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) | ||
444 | { | ||
445 | int i; | ||
446 | int num_strata; | ||
447 | struct GNUNET_HashCode *hash_src; | ||
448 | uint8_t *count_src; | ||
449 | |||
450 | GNUNET_assert (GNUNET_NO == cpi->is_outgoing); | ||
451 | |||
452 | if (NULL == cpi->strata) | ||
453 | { | ||
454 | cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *)); | ||
455 | for (i = 0; i < STRATA_COUNT; i++) | ||
456 | cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | ||
457 | } | ||
458 | |||
459 | num_strata = ntohs (strata_msg->num_strata); | ||
460 | |||
461 | /* for correct message alignment, copy bucket types seperately */ | ||
462 | hash_src = (struct GNUNET_HashCode *) &strata_msg[1]; | ||
463 | |||
464 | for (i = 0; i < num_strata; i++) | ||
465 | { | ||
466 | memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); | ||
467 | hash_src += STRATA_IBF_BUCKETS; | ||
468 | } | ||
469 | |||
470 | for (i = 0; i < num_strata; i++) | ||
471 | { | ||
472 | memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); | ||
473 | hash_src += STRATA_IBF_BUCKETS; | ||
474 | } | ||
475 | |||
476 | count_src = (uint8_t *) hash_src; | ||
477 | |||
478 | for (i = 0; i < num_strata; i++) | ||
479 | { | ||
480 | uint8_t zero[STRATA_IBF_BUCKETS]; | ||
481 | memset (zero, 0, STRATA_IBF_BUCKETS); | ||
482 | memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS); | ||
483 | count_src += STRATA_IBF_BUCKETS; | ||
484 | } | ||
485 | |||
486 | GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE)); | ||
487 | |||
488 | cpi->strata_counter += num_strata; | ||
489 | |||
490 | if (STRATA_COUNT == cpi->strata_counter) | ||
491 | { | ||
492 | int diff; | ||
493 | diff = estimate_difference (cpi->session->strata, cpi->strata); | ||
494 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff); | ||
495 | } | ||
496 | |||
497 | return GNUNET_YES; | ||
498 | } | ||
499 | |||
500 | |||
501 | static int | ||
502 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *strata) | ||
503 | { | ||
504 | return GNUNET_YES; | ||
505 | } | ||
506 | |||
507 | |||
508 | static int | ||
509 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element *strata) | ||
510 | { | ||
511 | return GNUNET_YES; | ||
512 | } | ||
513 | |||
514 | |||
515 | static int | ||
516 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) | ||
517 | { | ||
518 | struct ConsensusSession *session; | ||
519 | session = sessions_head; | ||
520 | while (NULL != session) | ||
521 | { | ||
522 | if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) | ||
523 | { | ||
524 | int idx; | ||
525 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n"); | ||
526 | idx = get_peer_idx (inc->peer, session); | ||
527 | GNUNET_assert (-1 != idx); | ||
528 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx); | ||
529 | inc->cpi = &session->info[idx]; | ||
530 | GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing); | ||
531 | inc->cpi->mst = inc->mst; | ||
532 | inc->cpi->hello = GNUNET_YES; | ||
533 | inc->cpi->socket = inc->socket; | ||
534 | return GNUNET_YES; | ||
535 | } | ||
536 | session = session->next; | ||
537 | } | ||
538 | GNUNET_assert (0); | ||
539 | return GNUNET_NO; | ||
540 | } | ||
541 | |||
542 | |||
543 | /** | ||
544 | * Functions with this signature are called whenever a | ||
545 | * complete message is received by the tokenizer. | ||
546 | * | ||
547 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
548 | * | ||
549 | * @param cls closure | ||
550 | * @param client identification of the client | ||
551 | * @param message the actual message | ||
552 | * | ||
553 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
554 | */ | ||
555 | static int | ||
556 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | ||
557 | { | ||
558 | struct ConsensusPeerInformation *cpi; | ||
559 | cpi = (struct ConsensusPeerInformation *) cls; | ||
560 | switch (ntohs( message->type)) | ||
561 | { | ||
562 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: | ||
563 | return handle_p2p_strata (cpi, (struct StrataMessage *) message); | ||
564 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: | ||
565 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); | ||
566 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: | ||
567 | return handle_p2p_element (cpi, (struct Element *) message); | ||
568 | default: | ||
569 | /* FIXME: handle correctly */ | ||
570 | GNUNET_assert (0); | ||
571 | } | ||
572 | return GNUNET_OK; | ||
573 | } | ||
574 | |||
575 | |||
576 | /** | ||
577 | * Handle tokenized messages from stream sockets. | ||
578 | * Delegate them if the socket belongs to a session, | ||
579 | * handle hello messages otherwise. | ||
580 | * | ||
581 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
582 | * | ||
583 | * @param cls closure, unused | ||
584 | * @param client incoming socket this message comes from | ||
585 | * @param message the actual message | ||
586 | * | ||
587 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
588 | */ | ||
589 | static int | ||
590 | mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | ||
591 | { | ||
592 | struct IncomingSocket *inc; | ||
593 | inc = (struct IncomingSocket *) client; | ||
594 | switch (ntohs( message->type)) | ||
595 | { | ||
596 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: | ||
597 | return handle_p2p_hello (inc, (struct ConsensusHello *) message); | ||
598 | default: | ||
599 | if (NULL != inc->cpi) | ||
600 | return mst_session_callback (inc->cpi, client, message); | ||
601 | /* FIXME: disconnect peer properly */ | ||
602 | GNUNET_assert (0); | ||
603 | } | ||
604 | return GNUNET_OK; | ||
605 | } | ||
606 | |||
607 | |||
608 | /** | ||
609 | * Functions of this type are called upon new stream connection from other peers | ||
610 | * or upon binding error which happen when the app_port given in | ||
611 | * GNUNET_STREAM_listen() is already taken. | ||
612 | * | ||
613 | * @param cls the closure from GNUNET_STREAM_listen | ||
614 | * @param socket the socket representing the stream; NULL on binding error | ||
615 | * @param initiator the identity of the peer who wants to establish a stream | ||
616 | * with us; NULL on binding error | ||
617 | * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the | ||
618 | * stream (the socket will be invalid after the call) | ||
619 | */ | ||
620 | static int | ||
621 | listen_cb (void *cls, | ||
622 | struct GNUNET_STREAM_Socket *socket, | ||
623 | const struct GNUNET_PeerIdentity *initiator) | ||
624 | { | ||
625 | struct IncomingSocket *incoming; | ||
626 | |||
627 | GNUNET_assert (NULL != socket); | ||
628 | |||
629 | incoming = GNUNET_malloc (sizeof *incoming); | ||
630 | |||
631 | incoming->socket = socket; | ||
632 | incoming->peer = GNUNET_memdup (initiator, sizeof *initiator); | ||
633 | |||
634 | incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
635 | &stream_data_processor, incoming); | ||
636 | |||
637 | |||
638 | incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); | ||
639 | |||
640 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); | ||
641 | |||
642 | return GNUNET_OK; | ||
643 | } | ||
644 | |||
645 | |||
646 | static void | ||
647 | destroy_session (struct ConsensusSession *session) | ||
648 | { | ||
649 | /* FIXME: more stuff to free! */ | ||
650 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); | ||
651 | GNUNET_SERVER_client_drop (session->client); | ||
652 | GNUNET_free (session); | ||
653 | } | ||
654 | |||
655 | |||
656 | /** | ||
657 | * Disconnect a client, and destroy all sessions associated with it. | ||
658 | * | ||
659 | * @param client the client to disconnect | ||
660 | */ | ||
239 | static void | 661 | static void |
240 | disconnect_client (struct GNUNET_SERVER_Client *client) | 662 | disconnect_client (struct GNUNET_SERVER_Client *client) |
241 | { | 663 | { |
664 | struct ConsensusSession *session; | ||
242 | GNUNET_SERVER_client_disconnect (client); | 665 | GNUNET_SERVER_client_disconnect (client); |
243 | /* FIXME: free data structures that this client owns */ | 666 | |
667 | /* if the client owns a session, remove it */ | ||
668 | session = sessions_head; | ||
669 | while (NULL != session) | ||
670 | { | ||
671 | if (client == session->client) | ||
672 | { | ||
673 | destroy_session (session); | ||
674 | break; | ||
675 | } | ||
676 | session = session->next; | ||
677 | } | ||
244 | } | 678 | } |
245 | 679 | ||
680 | |||
681 | /** | ||
682 | * Compute a global, (hopefully) unique consensus session id, | ||
683 | * from the local id of the consensus session, and the identities of all participants. | ||
684 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of | ||
685 | * exactly the same peers, the global id will be different. | ||
686 | * | ||
687 | * @param local_id local id of the consensus session | ||
688 | * @param peers array of all peers participating in the consensus session | ||
689 | * @param num_peers number of elements in the peers array | ||
690 | * @param dst where the result is stored, may not be NULL | ||
691 | */ | ||
246 | static void | 692 | static void |
247 | compute_global_id (struct GNUNET_HashCode *dst, | 693 | compute_global_id (const struct GNUNET_HashCode *local_id, |
248 | const struct GNUNET_HashCode *local_id, | 694 | const struct GNUNET_PeerIdentity *peers, int num_peers, |
249 | const struct GNUNET_PeerIdentity *peers, | 695 | struct GNUNET_HashCode *dst) |
250 | int num_peers) | ||
251 | { | 696 | { |
252 | int i; | 697 | int i; |
253 | struct GNUNET_HashCode tmp; | 698 | struct GNUNET_HashCode tmp; |
@@ -263,45 +708,50 @@ compute_global_id (struct GNUNET_HashCode *dst, | |||
263 | } | 708 | } |
264 | 709 | ||
265 | 710 | ||
711 | /** | ||
712 | * Function called to notify a client about the connection | ||
713 | * begin ready to queue more data. "buf" will be | ||
714 | * NULL and "size" zero if the connection was closed for | ||
715 | * writing in the meantime. | ||
716 | * | ||
717 | * @param cls consensus session | ||
718 | * @param size number of bytes available in buf | ||
719 | * @param buf where the callee should write the message | ||
720 | * @return number of bytes written to buf | ||
721 | */ | ||
266 | static size_t | 722 | static size_t |
267 | transmit_pending (void *cls, size_t size, void *buf) | 723 | transmit_queued (void *cls, size_t size, |
724 | void *buf) | ||
268 | { | 725 | { |
269 | struct GNUNET_CONSENSUS_Element *element; | ||
270 | struct GNUNET_CONSENSUS_ElementMessage *msg; | ||
271 | struct ConsensusSession *session; | 726 | struct ConsensusSession *session; |
727 | struct QueuedMessage *qmsg; | ||
728 | size_t msg_size; | ||
272 | 729 | ||
273 | session = (struct ConsensusSession *) cls; | 730 | session = (struct ConsensusSession *) cls; |
274 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf; | ||
275 | element = session->transmit_pending_head->element; | ||
276 | |||
277 | GNUNET_assert (NULL != element); | ||
278 | |||
279 | session->th = NULL; | 731 | session->th = NULL; |
280 | 732 | ||
281 | msg->element_type = element->type; | ||
282 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); | ||
283 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size); | ||
284 | memcpy (&msg[1], element->data, element->size); | ||
285 | 733 | ||
286 | session->transmit_pending_head = session->transmit_pending_head->next; | 734 | qmsg = session->client_messages_head; |
735 | GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); | ||
736 | GNUNET_assert (qmsg); | ||
287 | 737 | ||
288 | send_next (session); | 738 | if (NULL == buf) |
739 | { | ||
740 | destroy_session (session); | ||
741 | return 0; | ||
742 | } | ||
289 | 743 | ||
290 | return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size; | 744 | msg_size = ntohs (qmsg->msg->size); |
291 | } | ||
292 | 745 | ||
746 | GNUNET_assert (size >= msg_size); | ||
293 | 747 | ||
294 | static size_t | 748 | memcpy (buf, qmsg->msg, msg_size); |
295 | transmit_conclude_done (void *cls, size_t size, void *buf) | 749 | GNUNET_free (qmsg->msg); |
296 | { | 750 | GNUNET_free (qmsg); |
297 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg; | ||
298 | 751 | ||
299 | msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf; | 752 | send_next (session); |
300 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | ||
301 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage)); | ||
302 | msg->num_peers = htons (0); | ||
303 | 753 | ||
304 | return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage); | 754 | return msg_size; |
305 | } | 755 | } |
306 | 756 | ||
307 | 757 | ||
@@ -313,65 +763,253 @@ transmit_conclude_done (void *cls, size_t size, void *buf) | |||
313 | static void | 763 | static void |
314 | send_next (struct ConsensusSession *session) | 764 | send_next (struct ConsensusSession *session) |
315 | { | 765 | { |
316 | int msize; | ||
317 | 766 | ||
318 | GNUNET_assert (NULL != session); | 767 | GNUNET_assert (NULL != session); |
319 | 768 | ||
320 | if (NULL != session->th) | 769 | if (NULL != session->th) |
321 | { | ||
322 | return; | 770 | return; |
323 | } | ||
324 | 771 | ||
325 | if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO)) | 772 | if (NULL != session->client_messages_head) |
326 | { | 773 | { |
327 | /* FIXME */ | 774 | int msize; |
328 | msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); | 775 | msize = ntohs (session->client_messages_head->msg->size); |
329 | session->th = | 776 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); |
330 | GNUNET_SERVER_notify_transmit_ready (session->client, msize, | 777 | session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, |
331 | GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, session); | 778 | GNUNET_TIME_UNIT_FOREVER_REL, |
332 | session->conclude_sent = GNUNET_YES; | 779 | &transmit_queued, session); |
333 | } | 780 | } |
334 | else if (NULL != session->transmit_pending_head) | 781 | } |
782 | |||
783 | |||
784 | /** | ||
785 | * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have | ||
786 | * the correct signature to be used with e.g. qsort. | ||
787 | * We use this function instead. | ||
788 | * | ||
789 | * @param h1 some hash code | ||
790 | * @param h2 some hash code | ||
791 | * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2. | ||
792 | */ | ||
793 | static int | ||
794 | hash_cmp (const void *a, const void *b) | ||
795 | { | ||
796 | return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b); | ||
797 | } | ||
798 | |||
799 | |||
800 | /** | ||
801 | * Search peer in the list of peers in session. | ||
802 | * | ||
803 | * @param peer peer to find | ||
804 | * @param session session with peer | ||
805 | * @return index of peer, -1 if peer is not in session | ||
806 | */ | ||
807 | static int | ||
808 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) | ||
809 | { | ||
810 | const struct GNUNET_PeerIdentity *needle; | ||
811 | needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); | ||
812 | if (NULL == needle) | ||
813 | return -1; | ||
814 | return needle - session->peers; | ||
815 | } | ||
816 | |||
817 | |||
818 | |||
819 | static void | ||
820 | hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
821 | { | ||
822 | struct ConsensusPeerInformation *cpi; | ||
823 | |||
824 | cpi = (struct ConsensusPeerInformation *) cls; | ||
825 | cpi->hello = GNUNET_YES; | ||
826 | |||
827 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
828 | |||
829 | cpi = (struct ConsensusPeerInformation *) cls; | ||
830 | |||
831 | if (cpi->session->conclude_requested) | ||
335 | { | 832 | { |
336 | msize = session->transmit_pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 833 | write_strata (cpi, GNUNET_STREAM_OK, 0); |
337 | session->th = | ||
338 | GNUNET_SERVER_notify_transmit_ready (session->client, msize, | ||
339 | GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, session); | ||
340 | /* TODO: insert into ack pending */ | ||
341 | } | 834 | } |
342 | } | 835 | } |
343 | 836 | ||
344 | 837 | ||
345 | /** | 838 | /** |
346 | * Method called whenever a peer has disconnected from the tunnel. | 839 | * Functions of this type will be called when a stream is established |
347 | * Implementations of this callback must NOT call | ||
348 | * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those | ||
349 | * to run in some other task later. However, calling | ||
350 | * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed. | ||
351 | * | 840 | * |
352 | * @param cls closure | 841 | * @param cls the closure from GNUNET_STREAM_open |
353 | * @param peer peer identity the tunnel stopped working with | 842 | * @param socket socket to use to communicate with the other side (read/write) |
354 | */ | 843 | */ |
355 | static void | 844 | static void |
356 | disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer) | 845 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) |
846 | { | ||
847 | struct ConsensusPeerInformation *cpi; | ||
848 | struct ConsensusHello *hello; | ||
849 | |||
850 | |||
851 | cpi = (struct ConsensusPeerInformation *) cls; | ||
852 | cpi->is_connected = GNUNET_YES; | ||
853 | |||
854 | hello = GNUNET_malloc (sizeof *hello); | ||
855 | hello->header.size = htons (sizeof *hello); | ||
856 | hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); | ||
857 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); | ||
858 | |||
859 | |||
860 | cpi->wh = | ||
861 | GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); | ||
862 | |||
863 | } | ||
864 | |||
865 | |||
866 | static void | ||
867 | initialize_session_info (struct ConsensusSession *session) | ||
868 | { | ||
869 | int i; | ||
870 | int last; | ||
871 | |||
872 | for (i = 0; i < session->num_peers; ++i) | ||
873 | { | ||
874 | /* initialize back-references, so consensus peer information can | ||
875 | * be used as closure */ | ||
876 | session->info[i].session = session; | ||
877 | |||
878 | } | ||
879 | |||
880 | last = (session->local_peer_idx + (session->num_peers / 2)) % session->num_peers; | ||
881 | i = (session->local_peer_idx + 1) % session->num_peers; | ||
882 | while (i != last) | ||
883 | { | ||
884 | session->info[i].is_outgoing = GNUNET_YES; | ||
885 | session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
886 | open_cb, &session->info[i], GNUNET_STREAM_OPTION_END); | ||
887 | session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, session); | ||
888 | i = (i + 1) % session->num_peers; | ||
889 | } | ||
890 | // tie-breaker for even number of peers | ||
891 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | ||
892 | { | ||
893 | session->info[last].is_outgoing = GNUNET_YES; | ||
894 | session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
895 | open_cb, &session->info[last], GNUNET_STREAM_OPTION_END); | ||
896 | } | ||
897 | } | ||
898 | |||
899 | |||
900 | /** | ||
901 | * Create the sorted list of peers for the session, | ||
902 | * add the local peer if not in the join message. | ||
903 | */ | ||
904 | static void | ||
905 | initialize_session_peer_list (struct ConsensusSession *session) | ||
906 | { | ||
907 | int local_peer_in_list; | ||
908 | int listed_peers; | ||
909 | const struct GNUNET_PeerIdentity *msg_peers; | ||
910 | unsigned int i; | ||
911 | |||
912 | GNUNET_assert (NULL != session->join_msg); | ||
913 | |||
914 | /* peers in the join message, may or may not include the local peer */ | ||
915 | listed_peers = ntohs (session->join_msg->num_peers); | ||
916 | |||
917 | session->num_peers = listed_peers; | ||
918 | |||
919 | msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1]; | ||
920 | |||
921 | local_peer_in_list = GNUNET_NO; | ||
922 | for (i = 0; i < listed_peers; i++) | ||
923 | { | ||
924 | if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) | ||
925 | { | ||
926 | local_peer_in_list = GNUNET_YES; | ||
927 | break; | ||
928 | } | ||
929 | } | ||
930 | |||
931 | if (GNUNET_NO == local_peer_in_list) | ||
932 | session->num_peers++; | ||
933 | |||
934 | session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
935 | |||
936 | if (GNUNET_NO == local_peer_in_list) | ||
937 | session->peers[session->num_peers - 1] = *my_peer; | ||
938 | |||
939 | memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
940 | qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); | ||
941 | } | ||
942 | |||
943 | |||
944 | static void | ||
945 | strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key) | ||
357 | { | 946 | { |
358 | /* FIXME: how do we handle this */ | 947 | uint32_t v; |
948 | int i; | ||
949 | v = key->bits[0]; | ||
950 | /* count trailing '1'-bits of v */ | ||
951 | for (i = 0; v & 1; v>>=1, i++); | ||
952 | |||
953 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i); | ||
954 | |||
955 | ibf_insert (strata[i], key); | ||
359 | } | 956 | } |
360 | 957 | ||
361 | 958 | ||
362 | /** | 959 | /** |
363 | * Method called whenever a peer has connected to the tunnel. | 960 | * Initialize the session, continue receiving messages from the owning client |
364 | * | 961 | * |
365 | * @param cls closure | 962 | * @param session the session to initialize |
366 | * @param peer peer identity the tunnel was created to, NULL on timeout | ||
367 | * @param atsi performance data for the connection | ||
368 | */ | 963 | */ |
369 | static void | 964 | static void |
370 | connect_handler (void *cls, | 965 | initialize_session (struct ConsensusSession *session) |
371 | const struct GNUNET_PeerIdentity *peer, | ||
372 | const struct GNUNET_ATS_Information *atsi) | ||
373 | { | 966 | { |
374 | /* not much we can do here, now we know the other peer has been added to our broadcast tunnel */ | 967 | const struct ConsensusSession *other_session; |
968 | int i; | ||
969 | |||
970 | GNUNET_assert (NULL != session->join_msg); | ||
971 | |||
972 | initialize_session_peer_list (session); | ||
973 | |||
974 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); | ||
975 | |||
976 | compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id); | ||
977 | |||
978 | /* Check if some local client already owns the session. */ | ||
979 | other_session = sessions_head; | ||
980 | while (NULL != other_session) | ||
981 | { | ||
982 | if ((other_session != session) && | ||
983 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) | ||
984 | { | ||
985 | /* session already owned by another client */ | ||
986 | GNUNET_break (0); | ||
987 | disconnect_client (session->client); | ||
988 | return; | ||
989 | } | ||
990 | other_session = other_session->next; | ||
991 | } | ||
992 | |||
993 | session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | ||
994 | |||
995 | session->local_peer_idx = get_peer_idx (my_peer, session); | ||
996 | GNUNET_assert (-1 != session->local_peer_idx); | ||
997 | |||
998 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); | ||
999 | |||
1000 | session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *)); | ||
1001 | for (i = 0; i < STRATA_COUNT; i++) | ||
1002 | session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | ||
1003 | |||
1004 | session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); | ||
1005 | |||
1006 | initialize_session_info (session); | ||
1007 | |||
1008 | GNUNET_free (session->join_msg); | ||
1009 | session->join_msg = NULL; | ||
1010 | |||
1011 | GNUNET_SERVER_receive_done (session->client, GNUNET_OK); | ||
1012 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); | ||
375 | } | 1013 | } |
376 | 1014 | ||
377 | 1015 | ||
@@ -387,88 +1025,49 @@ client_join (void *cls, | |||
387 | struct GNUNET_SERVER_Client *client, | 1025 | struct GNUNET_SERVER_Client *client, |
388 | const struct GNUNET_MessageHeader *m) | 1026 | const struct GNUNET_MessageHeader *m) |
389 | { | 1027 | { |
390 | struct GNUNET_HashCode global_id; | ||
391 | const struct GNUNET_CONSENSUS_JoinMessage *msg; | ||
392 | struct ConsensusSession *session; | 1028 | struct ConsensusSession *session; |
393 | unsigned int i; | ||
394 | |||
395 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n"); | ||
396 | |||
397 | msg = (struct GNUNET_CONSENSUS_JoinMessage *) m; | ||
398 | |||
399 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s (&msg->session_id)); | ||
400 | 1029 | ||
401 | compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers); | 1030 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join received\n"); |
402 | |||
403 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s (&global_id)); | ||
404 | 1031 | ||
1032 | // make sure the client has not already joined a session | ||
405 | session = sessions_head; | 1033 | session = sessions_head; |
406 | while (NULL != session) | 1034 | while (NULL != session) |
407 | { | 1035 | { |
408 | if (client == session->client) | 1036 | if (session->client == client) |
409 | { | ||
410 | |||
411 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n"); | ||
412 | disconnect_client (client); | ||
413 | return; | ||
414 | } | ||
415 | if (0 == memcmp (session->global_id, &global_id, sizeof (struct GNUNET_HashCode))) | ||
416 | { | 1037 | { |
417 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another client\n"); | 1038 | GNUNET_break (0); |
418 | disconnect_client (client); | 1039 | disconnect_client (client); |
419 | return; | 1040 | return; |
420 | } | 1041 | } |
421 | session = session->next; | 1042 | session = session->next; |
422 | } | 1043 | } |
423 | 1044 | ||
424 | GNUNET_SERVER_client_keep (client); | ||
425 | |||
426 | /* session does not exist yet, create it */ | ||
427 | session = GNUNET_malloc (sizeof (struct ConsensusSession)); | 1045 | session = GNUNET_malloc (sizeof (struct ConsensusSession)); |
428 | session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode)); | 1046 | session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); |
429 | session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode)); | ||
430 | session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); | ||
431 | session->client = client; | 1047 | session->client = client; |
432 | /* FIXME: should not be a constant, but chosen adaptively */ | 1048 | GNUNET_SERVER_client_keep (client); |
433 | session->round_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); | ||
434 | |||
435 | session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session, connect_handler, disconnect_handler, session); | ||
436 | |||
437 | session->num_peers = 0; | ||
438 | |||
439 | /* count the peers that are not the local peer */ | ||
440 | for (i = 0; i < msg->num_peers; i++) | ||
441 | { | ||
442 | struct GNUNET_PeerIdentity *peers; | ||
443 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
444 | if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) | ||
445 | session->num_peers++; | ||
446 | } | ||
447 | 1049 | ||
448 | session->peers = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeer)); | 1050 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); |
449 | 1051 | ||
450 | /* copy the peer identities and add peers to broadcast tunnel */ | 1052 | // Initialize session later if local peer identity is not known yet. |
451 | for (i = 0; i < msg->num_peers; i++) | 1053 | if (NULL == my_peer) |
452 | { | 1054 | { |
453 | struct GNUNET_PeerIdentity *peers; | 1055 | GNUNET_SERVER_disable_receive_done_warning (client); |
454 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 1056 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n"); |
455 | if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) | 1057 | return; |
456 | { | ||
457 | *session->peers->peer_id = peers[i]; | ||
458 | GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel, &peers[i]); | ||
459 | } | ||
460 | } | 1058 | } |
461 | 1059 | ||
462 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n"); | 1060 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n"); |
463 | 1061 | initialize_session (session); | |
464 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | ||
465 | |||
466 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
467 | } | 1062 | } |
468 | 1063 | ||
469 | 1064 | ||
470 | /** | 1065 | /** |
471 | * Called when a client performs an insert operation. | 1066 | * Called when a client performs an insert operation. |
1067 | * | ||
1068 | * @param cls (unused) | ||
1069 | * @param client client handle | ||
1070 | * @param message message sent by the client | ||
472 | */ | 1071 | */ |
473 | void | 1072 | void |
474 | client_insert (void *cls, | 1073 | client_insert (void *cls, |
@@ -510,7 +1109,9 @@ client_insert (void *cls, | |||
510 | GNUNET_CRYPTO_hash (element, element_size, &key); | 1109 | GNUNET_CRYPTO_hash (element, element_size, &key); |
511 | 1110 | ||
512 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, | 1111 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, |
513 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | 1112 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1113 | |||
1114 | strata_insert (session->strata, &key); | ||
514 | 1115 | ||
515 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1116 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
516 | 1117 | ||
@@ -518,31 +1119,145 @@ client_insert (void *cls, | |||
518 | } | 1119 | } |
519 | 1120 | ||
520 | 1121 | ||
1122 | |||
1123 | /** | ||
1124 | * Functions of this signature are called whenever writing operations | ||
1125 | * on a stream are executed | ||
1126 | * | ||
1127 | * @param cls the closure from GNUNET_STREAM_write | ||
1128 | * @param status the status of the stream at the time this function is called; | ||
1129 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
1130 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
1131 | * (this doesn't mean that the data is never sent, the receiver may | ||
1132 | * have read the data but its ACKs may have been lost); | ||
1133 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
1134 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
1135 | * be processed. | ||
1136 | * @param size the number of bytes written | ||
1137 | */ | ||
1138 | static void | ||
1139 | write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
1140 | { | ||
1141 | struct ConsensusPeerInformation *cpi; | ||
1142 | struct StrataMessage *strata_msg; | ||
1143 | size_t msize; | ||
1144 | int i; | ||
1145 | struct GNUNET_HashCode *hash_dst; | ||
1146 | uint8_t *count_dst; | ||
1147 | int num_strata; | ||
1148 | |||
1149 | cpi = (struct ConsensusPeerInformation *) cls; | ||
1150 | |||
1151 | GNUNET_assert (GNUNET_YES == cpi->is_outgoing); | ||
1152 | |||
1153 | /* FIXME: handle this */ | ||
1154 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1155 | |||
1156 | if (STRATA_COUNT == cpi->strata_counter) | ||
1157 | { | ||
1158 | /* strata have been written, wait for other side's IBF */ | ||
1159 | return; | ||
1160 | } | ||
1161 | |||
1162 | if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE) | ||
1163 | num_strata = (STRATA_COUNT - cpi->strata_counter); | ||
1164 | else | ||
1165 | num_strata = STRATA_PER_MESSAGE; | ||
1166 | |||
1167 | |||
1168 | msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); | ||
1169 | |||
1170 | strata_msg = GNUNET_malloc (msize); | ||
1171 | strata_msg->header.size = htons (msize); | ||
1172 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | ||
1173 | strata_msg->num_strata = htons (num_strata); | ||
1174 | |||
1175 | /* for correct message alignment, copy bucket types seperately */ | ||
1176 | hash_dst = (struct GNUNET_HashCode *) &strata_msg[1]; | ||
1177 | |||
1178 | for (i = 0; i < num_strata; i++) | ||
1179 | { | ||
1180 | memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); | ||
1181 | hash_dst += STRATA_IBF_BUCKETS; | ||
1182 | } | ||
1183 | |||
1184 | for (i = 0; i < num_strata; i++) | ||
1185 | { | ||
1186 | memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); | ||
1187 | hash_dst += STRATA_IBF_BUCKETS; | ||
1188 | } | ||
1189 | |||
1190 | count_dst = (uint8_t *) hash_dst; | ||
1191 | |||
1192 | for (i = 0; i < num_strata; i++) | ||
1193 | { | ||
1194 | memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, STRATA_IBF_BUCKETS); | ||
1195 | count_dst += STRATA_IBF_BUCKETS; | ||
1196 | } | ||
1197 | |||
1198 | cpi->strata_counter += num_strata; | ||
1199 | |||
1200 | cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1201 | write_strata, cpi); | ||
1202 | |||
1203 | GNUNET_assert (NULL != cpi->wh); | ||
1204 | } | ||
1205 | |||
1206 | |||
521 | /** | 1207 | /** |
522 | * Do one round of the conclusion. | 1208 | * Functions of this signature are called whenever writing operations |
523 | * Start by broadcasting the set difference estimator (IBF strata). | 1209 | * on a stream are executed |
524 | * | 1210 | * |
1211 | * @param cls the closure from GNUNET_STREAM_write | ||
1212 | * @param status the status of the stream at the time this function is called; | ||
1213 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
1214 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
1215 | * (this doesn't mean that the data is never sent, the receiver may | ||
1216 | * have read the data but its ACKs may have been lost); | ||
1217 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
1218 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
1219 | * be processed. | ||
1220 | * @param size the number of bytes written | ||
525 | */ | 1221 | */ |
526 | void | 1222 | static void |
527 | conclude_do_round (struct ConsensusSession *session) | 1223 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
528 | { | 1224 | { |
529 | /* FIXME */ | 1225 | struct ConsensusPeerInformation *cpi; |
1226 | |||
1227 | cpi = (struct ConsensusPeerInformation *) cls; | ||
530 | } | 1228 | } |
531 | 1229 | ||
532 | 1230 | ||
533 | /** | 1231 | /** |
534 | * Cancel the current round if necessary, decide to run another round or | 1232 | * Functions of this signature are called whenever writing operations |
535 | * terminate. | 1233 | * on a stream are executed |
1234 | * | ||
1235 | * @param cls the closure from GNUNET_STREAM_write | ||
1236 | * @param status the status of the stream at the time this function is called; | ||
1237 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
1238 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
1239 | * (this doesn't mean that the data is never sent, the receiver may | ||
1240 | * have read the data but its ACKs may have been lost); | ||
1241 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
1242 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
1243 | * be processed. | ||
1244 | * @param size the number of bytes written | ||
536 | */ | 1245 | */ |
537 | void | 1246 | static void |
538 | conclude_round_done (struct ConsensusSession *session) | 1247 | write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
539 | { | 1248 | { |
540 | /* FIXME */ | 1249 | struct ConsensusPeerInformation *cpi; |
1250 | |||
1251 | cpi = (struct ConsensusPeerInformation *) cls; | ||
541 | } | 1252 | } |
542 | 1253 | ||
543 | 1254 | ||
544 | /** | 1255 | /** |
545 | * Called when a client performs the conclude operation. | 1256 | * Called when a client performs the conclude operation. |
1257 | * | ||
1258 | * @param cls (unused) | ||
1259 | * @param client client handle | ||
1260 | * @param message message sent by the client | ||
546 | */ | 1261 | */ |
547 | void | 1262 | void |
548 | client_conclude (void *cls, | 1263 | client_conclude (void *cls, |
@@ -550,40 +1265,55 @@ client_conclude (void *cls, | |||
550 | const struct GNUNET_MessageHeader *message) | 1265 | const struct GNUNET_MessageHeader *message) |
551 | { | 1266 | { |
552 | struct ConsensusSession *session; | 1267 | struct ConsensusSession *session; |
1268 | int i; | ||
553 | 1269 | ||
554 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); | 1270 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); |
555 | 1271 | ||
556 | session = sessions_head; | 1272 | session = sessions_head; |
557 | while ((session != NULL) && (session->client != client)) | 1273 | while ((session != NULL) && (session->client != client)) |
558 | { | ||
559 | session = session->next; | 1274 | session = session->next; |
560 | } | ||
561 | if (NULL == session) | 1275 | if (NULL == session) |
562 | { | 1276 | { |
563 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n"); | 1277 | /* client not found */ |
1278 | GNUNET_break (0); | ||
564 | GNUNET_SERVER_client_disconnect (client); | 1279 | GNUNET_SERVER_client_disconnect (client); |
565 | return; | 1280 | return; |
566 | } | 1281 | } |
567 | 1282 | ||
568 | if (GNUNET_YES == session->conclude_requested) | 1283 | if (GNUNET_YES == session->conclude_requested) |
569 | { | 1284 | { |
570 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude twice\n"); | 1285 | /* client requested conclude twice */ |
571 | GNUNET_SERVER_client_disconnect (client); | 1286 | GNUNET_break (0); |
1287 | disconnect_client (client); | ||
572 | return; | 1288 | return; |
573 | } | 1289 | } |
574 | 1290 | ||
575 | session->conclude_requested = GNUNET_YES; | 1291 | session->conclude_requested = GNUNET_YES; |
576 | 1292 | ||
577 | conclude_do_round (session); | 1293 | /* FIXME: write to already connected sockets */ |
578 | 1294 | ||
579 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1295 | for (i = 0; i < session->num_peers; i++) |
1296 | { | ||
1297 | if ( (GNUNET_YES == session->info[i].is_outgoing) && | ||
1298 | (GNUNET_YES == session->info[i].hello) ) | ||
1299 | { | ||
1300 | /* kick off transmitting strata by calling the write continuation */ | ||
1301 | write_strata (&session->info[i], GNUNET_STREAM_OK, 0); | ||
1302 | } | ||
1303 | } | ||
1304 | |||
580 | 1305 | ||
1306 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
581 | send_next (session); | 1307 | send_next (session); |
582 | } | 1308 | } |
583 | 1309 | ||
584 | 1310 | ||
585 | /** | 1311 | /** |
586 | * Called when a client sends an ack | 1312 | * Called when a client sends an ack |
1313 | * | ||
1314 | * @param cls (unused) | ||
1315 | * @param client client handle | ||
1316 | * @param message message sent by the client | ||
587 | */ | 1317 | */ |
588 | void | 1318 | void |
589 | client_ack (void *cls, | 1319 | client_ack (void *cls, |
@@ -614,71 +1344,23 @@ core_startup (void *cls, | |||
614 | struct GNUNET_CORE_Handle *core, | 1344 | struct GNUNET_CORE_Handle *core, |
615 | const struct GNUNET_PeerIdentity *peer) | 1345 | const struct GNUNET_PeerIdentity *peer) |
616 | { | 1346 | { |
617 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { | 1347 | struct ConsensusSession *session; |
618 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | ||
619 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | ||
620 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, | ||
621 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, | ||
622 | {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, | ||
623 | sizeof (struct GNUNET_CONSENSUS_AckMessage)}, | ||
624 | {NULL, NULL, 0, 0} | ||
625 | }; | ||
626 | 1348 | ||
627 | GNUNET_SERVER_add_handlers (srv, handlers); | ||
628 | my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); | 1349 | my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); |
629 | /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ | 1350 | /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ |
630 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); | 1351 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); |
631 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); | 1352 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); |
632 | } | ||
633 | |||
634 | |||
635 | |||
636 | /** | ||
637 | * Method called whenever another peer has added us to a tunnel | ||
638 | * the other peer initiated. | ||
639 | * Only called (once) upon reception of data with a message type which was | ||
640 | * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy | ||
641 | * causes te tunnel to be ignored and no further notifications are sent about | ||
642 | * the same tunnel. | ||
643 | * | ||
644 | * @param cls closure | ||
645 | * @param tunnel new handle to the tunnel | ||
646 | * @param initiator peer that started the tunnel | ||
647 | * @param atsi performance information for the tunnel | ||
648 | * @return initial tunnel context for the tunnel | ||
649 | * (can be NULL -- that's not an error) | ||
650 | */ | ||
651 | static void * | ||
652 | new_tunnel (void *cls, | ||
653 | struct GNUNET_MESH_Tunnel *tunnel, | ||
654 | const struct GNUNET_PeerIdentity *initiator, | ||
655 | const struct GNUNET_ATS_Information *atsi) | ||
656 | { | ||
657 | /* there's nothing we can do here, as we don't have the global consensus id yet */ | ||
658 | return NULL; | ||
659 | } | ||
660 | 1353 | ||
661 | 1354 | session = sessions_head; | |
662 | /** | 1355 | while (NULL != session) |
663 | * Function called whenever an inbound tunnel is destroyed. Should clean up | 1356 | { |
664 | * any associated state. This function is NOT called if the client has | 1357 | if (NULL != session->join_msg) |
665 | * explicitly asked for the tunnel to be destroyed using | 1358 | initialize_session (session); |
666 | * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on | 1359 | session = session->next; |
667 | * the tunnel. | 1360 | } |
668 | * | ||
669 | * @param cls closure (set from GNUNET_MESH_connect) | ||
670 | * @param tunnel connection to the other end (henceforth invalid) | ||
671 | * @param tunnel_ctx place where local state associated | ||
672 | * with the tunnel is stored | ||
673 | */ | ||
674 | static void | ||
675 | cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx) | ||
676 | { | ||
677 | /* FIXME: what to do here? */ | ||
678 | } | 1361 | } |
679 | 1362 | ||
680 | 1363 | ||
681 | |||
682 | /** | 1364 | /** |
683 | * Called to clean up, after a shutdown has been requested. | 1365 | * Called to clean up, after a shutdown has been requested. |
684 | * | 1366 | * |
@@ -689,105 +1371,27 @@ static void | |||
689 | shutdown_task (void *cls, | 1371 | shutdown_task (void *cls, |
690 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 1372 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
691 | { | 1373 | { |
692 | /* mesh requires all the tunnels to be destroyed manually */ | ||
693 | while (NULL != sessions_head) | 1374 | while (NULL != sessions_head) |
694 | { | 1375 | { |
695 | struct ConsensusSession *session; | 1376 | struct ConsensusSession *session; |
696 | session = sessions_head; | 1377 | session = sessions_head; |
697 | GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel); | ||
698 | sessions_head = sessions_head->next; | 1378 | sessions_head = sessions_head->next; |
699 | GNUNET_free (session); | 1379 | GNUNET_free (session); |
700 | } | 1380 | } |
701 | 1381 | ||
702 | if (NULL != mesh) | ||
703 | { | ||
704 | GNUNET_MESH_disconnect (mesh); | ||
705 | mesh = NULL; | ||
706 | } | ||
707 | if (NULL != core) | 1382 | if (NULL != core) |
708 | { | 1383 | { |
709 | GNUNET_CORE_disconnect (core); | 1384 | GNUNET_CORE_disconnect (core); |
710 | core = NULL; | 1385 | core = NULL; |
711 | } | 1386 | } |
712 | } | ||
713 | |||
714 | |||
715 | |||
716 | /** | ||
717 | * Functions with this signature are called whenever a message is | ||
718 | * received. | ||
719 | * | ||
720 | * @param cls closure (set from GNUNET_MESH_connect) | ||
721 | * @param tunnel connection to the other end | ||
722 | * @param tunnel_ctx place to store local state associated with the tunnel | ||
723 | * @param sender who sent the message | ||
724 | * @param message the actual message | ||
725 | * @param atsi performance data for the connection | ||
726 | * @return GNUNET_OK to keep the connection open, | ||
727 | * GNUNET_SYSERR to close it (signal serious error) | ||
728 | */ | ||
729 | static int | ||
730 | p2p_delta_estimate (void *cls, | ||
731 | struct GNUNET_MESH_Tunnel * tunnel, | ||
732 | void **tunnel_ctx, | ||
733 | const struct GNUNET_PeerIdentity *sender, | ||
734 | const struct GNUNET_MessageHeader *message, | ||
735 | const struct GNUNET_ATS_Information *atsi) | ||
736 | { | ||
737 | /* FIXME */ | ||
738 | return GNUNET_OK; | ||
739 | } | ||
740 | |||
741 | |||
742 | /** | ||
743 | * Functions with this signature are called whenever a message is | ||
744 | * received. | ||
745 | * | ||
746 | * @param cls closure (set from GNUNET_MESH_connect) | ||
747 | * @param tunnel connection to the other end | ||
748 | * @param tunnel_ctx place to store local state associated with the tunnel | ||
749 | * @param sender who sent the message | ||
750 | * @param message the actual message | ||
751 | * @param atsi performance data for the connection | ||
752 | * @return GNUNET_OK to keep the connection open, | ||
753 | * GNUNET_SYSERR to close it (signal serious error) | ||
754 | */ | ||
755 | static int | ||
756 | p2p_difference_digest (void *cls, | ||
757 | struct GNUNET_MESH_Tunnel * tunnel, | ||
758 | void **tunnel_ctx, | ||
759 | const struct GNUNET_PeerIdentity *sender, | ||
760 | const struct GNUNET_MessageHeader *message, | ||
761 | const struct GNUNET_ATS_Information *atsi) | ||
762 | { | ||
763 | /* FIXME */ | ||
764 | return GNUNET_OK; | ||
765 | } | ||
766 | 1387 | ||
1388 | if (NULL != listener) | ||
1389 | { | ||
1390 | GNUNET_STREAM_listen_close (listener); | ||
1391 | listener = NULL; | ||
1392 | } | ||
767 | 1393 | ||
768 | /** | 1394 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); |
769 | * Functions with this signature are called whenever a message is | ||
770 | * received. | ||
771 | * | ||
772 | * @param cls closure (set from GNUNET_MESH_connect) | ||
773 | * @param tunnel connection to the other end | ||
774 | * @param tunnel_ctx place to store local state associated with the tunnel | ||
775 | * @param sender who sent the message | ||
776 | * @param message the actual message | ||
777 | * @param atsi performance data for the connection | ||
778 | * @return GNUNET_OK to keep the connection open, | ||
779 | * GNUNET_SYSERR to close it (signal serious error) | ||
780 | */ | ||
781 | static int | ||
782 | p2p_elements_and_requests (void *cls, | ||
783 | struct GNUNET_MESH_Tunnel * tunnel, | ||
784 | void **tunnel_ctx, | ||
785 | const struct GNUNET_PeerIdentity *sender, | ||
786 | const struct GNUNET_MessageHeader *message, | ||
787 | const struct GNUNET_ATS_Information *atsi) | ||
788 | { | ||
789 | /* FIXME */ | ||
790 | return GNUNET_OK; | ||
791 | } | 1395 | } |
792 | 1396 | ||
793 | 1397 | ||
@@ -801,33 +1405,38 @@ p2p_elements_and_requests (void *cls, | |||
801 | static void | 1405 | static void |
802 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) | 1406 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) |
803 | { | 1407 | { |
804 | static const struct GNUNET_CORE_MessageHandler handlers[] = { | 1408 | static const struct GNUNET_CORE_MessageHandler core_handlers[] = { |
805 | {NULL, 0, 0} | 1409 | {NULL, 0, 0} |
806 | }; | 1410 | }; |
807 | static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { | 1411 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { |
808 | {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0}, | 1412 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, |
809 | {p2p_difference_digest, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0}, | 1413 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, |
810 | {p2p_elements_and_requests, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0}, | 1414 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, |
811 | {NULL, 0, 0} | 1415 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, |
812 | }; | 1416 | {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, |
813 | static const GNUNET_MESH_ApplicationType app_types[] = { | 1417 | sizeof (struct GNUNET_CONSENSUS_AckMessage)}, |
814 | GNUNET_APPLICATION_TYPE_CONSENSUS, | 1418 | {NULL, NULL, 0, 0} |
815 | GNUNET_APPLICATION_TYPE_END | ||
816 | }; | 1419 | }; |
817 | 1420 | ||
818 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | ||
819 | |||
820 | cfg = c; | 1421 | cfg = c; |
821 | srv = server; | 1422 | srv = server; |
822 | 1423 | ||
1424 | GNUNET_SERVER_add_handlers (server, server_handlers); | ||
1425 | |||
823 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | 1426 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); |
824 | 1427 | ||
825 | mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers, app_types); | 1428 | |
826 | GNUNET_assert (NULL != mesh); | 1429 | listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, |
1430 | listen_cb, NULL, | ||
1431 | GNUNET_STREAM_OPTION_END); | ||
1432 | |||
827 | 1433 | ||
828 | /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ | 1434 | /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ |
829 | core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers); | 1435 | core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers); |
830 | GNUNET_assert (NULL != core); | 1436 | GNUNET_assert (NULL != core); |
1437 | |||
1438 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | ||
1439 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", STRATA_PER_MESSAGE); | ||
831 | } | 1440 | } |
832 | 1441 | ||
833 | 1442 | ||
@@ -843,6 +1452,7 @@ main (int argc, char *const *argv) | |||
843 | { | 1452 | { |
844 | int ret; | 1453 | int ret; |
845 | ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); | 1454 | ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); |
1455 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); | ||
846 | return (GNUNET_OK == ret) ? 0 : 1; | 1456 | return (GNUNET_OK == ret) ? 0 : 1; |
847 | } | 1457 | } |
848 | 1458 | ||
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index 4f1aca939..2d06fc29b 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c | |||
@@ -25,50 +25,8 @@ | |||
25 | * @author Florian Dold | 25 | * @author Florian Dold |
26 | */ | 26 | */ |
27 | 27 | ||
28 | #include "ibf.h" | ||
29 | |||
30 | |||
31 | /** | ||
32 | * Opaque handle to an invertible bloom filter (IBF). | ||
33 | * | ||
34 | * An IBF is a counting bloom filter that has the ability to restore | ||
35 | * the hashes of its stored elements with high probability. | ||
36 | */ | ||
37 | struct InvertibleBloomFilter | ||
38 | { | ||
39 | /** | ||
40 | * How many cells does this IBF have? | ||
41 | */ | ||
42 | unsigned int size; | ||
43 | 28 | ||
44 | /** | 29 | #include "ibf.h" |
45 | * In how many cells do we hash one element? | ||
46 | * Usually 4 or 3. | ||
47 | */ | ||
48 | unsigned int hash_num; | ||
49 | |||
50 | /** | ||
51 | * Salt for mingling hashes | ||
52 | */ | ||
53 | uint32_t salt; | ||
54 | |||
55 | /** | ||
56 | * How many times has a bucket been hit? | ||
57 | * Can be negative, as a result of IBF subtraction. | ||
58 | */ | ||
59 | int8_t *count; | ||
60 | |||
61 | /** | ||
62 | * xor sums of the elements' hash codes, used to identify the elements. | ||
63 | */ | ||
64 | struct GNUNET_HashCode *id_sum; | ||
65 | |||
66 | /** | ||
67 | * xor sums of the "hash of the hash". | ||
68 | */ | ||
69 | struct GNUNET_HashCode *hash_sum; | ||
70 | |||
71 | }; | ||
72 | 30 | ||
73 | 31 | ||
74 | /** | 32 | /** |
@@ -152,6 +110,8 @@ ibf_insert_on_side (struct InvertibleBloomFilter *ibf, | |||
152 | used_buckets[i] = bucket; | 110 | used_buckets[i] = bucket; |
153 | 111 | ||
154 | ibf->count[bucket] += side; | 112 | ibf->count[bucket] += side; |
113 | |||
114 | GNUNET_log_from(GNUNET_ERROR_TYPE_INFO, "ibf", "inserting in bucket %d \n", bucket); | ||
155 | 115 | ||
156 | GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket], | 116 | GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket], |
157 | &ibf->id_sum[bucket]); | 117 | &ibf->id_sum[bucket]); |
@@ -214,8 +174,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
214 | int i; | 174 | int i; |
215 | 175 | ||
216 | GNUNET_assert (NULL != ibf); | 176 | GNUNET_assert (NULL != ibf); |
217 | GNUNET_assert (NULL != ret_id); | ||
218 | GNUNET_assert (NULL != ret_side); | ||
219 | 177 | ||
220 | for (i = 0; i < ibf->size; i++) | 178 | for (i = 0; i < ibf->size; i++) |
221 | { | 179 | { |
@@ -227,8 +185,10 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
227 | if (0 != memcmp (&hash, &ibf->hash_sum[i], sizeof (struct GNUNET_HashCode))) | 185 | if (0 != memcmp (&hash, &ibf->hash_sum[i], sizeof (struct GNUNET_HashCode))) |
228 | continue; | 186 | continue; |
229 | 187 | ||
230 | *ret_side = ibf->count[i]; | 188 | if (NULL != ret_side) |
231 | *ret_id = ibf->id_sum[i]; | 189 | *ret_side = ibf->count[i]; |
190 | if (NULL != ret_id) | ||
191 | *ret_id = ibf->id_sum[i]; | ||
232 | 192 | ||
233 | /* insert on the opposite side, effectively removing the element */ | 193 | /* insert on the opposite side, effectively removing the element */ |
234 | ibf_insert_on_side (ibf, &ibf->id_sum[i], -ibf->count[i]); | 194 | ibf_insert_on_side (ibf, &ibf->id_sum[i], -ibf->count[i]); |
@@ -269,3 +229,36 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter * | |||
269 | } | 229 | } |
270 | } | 230 | } |
271 | 231 | ||
232 | /** | ||
233 | * Create a copy of an IBF, the copy has to be destroyed properly. | ||
234 | * | ||
235 | * @param ibf the IBF to copy | ||
236 | */ | ||
237 | struct InvertibleBloomFilter * | ||
238 | ibf_dup (struct InvertibleBloomFilter *ibf) | ||
239 | { | ||
240 | struct InvertibleBloomFilter *copy; | ||
241 | copy = GNUNET_malloc (sizeof *copy); | ||
242 | copy->hash_num = ibf->hash_num; | ||
243 | copy->salt = ibf->salt; | ||
244 | copy->size = ibf->size; | ||
245 | copy->hash_sum = GNUNET_memdup (ibf->hash_sum, ibf->size * sizeof (struct GNUNET_HashCode)); | ||
246 | copy->id_sum = GNUNET_memdup (ibf->id_sum, ibf->size * sizeof (struct GNUNET_HashCode)); | ||
247 | copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (uint8_t)); | ||
248 | return copy; | ||
249 | } | ||
250 | |||
251 | /** | ||
252 | * Destroy all resources associated with the invertible bloom filter. | ||
253 | * No more ibf_*-functions may be called on ibf after calling destroy. | ||
254 | * | ||
255 | * @param ibf the intertible bloom filter to destroy | ||
256 | */ | ||
257 | void | ||
258 | ibf_destroy (struct InvertibleBloomFilter *ibf) | ||
259 | { | ||
260 | GNUNET_free (ibf->hash_sum); | ||
261 | GNUNET_free (ibf->id_sum); | ||
262 | GNUNET_free (ibf->count); | ||
263 | GNUNET_free (ibf); | ||
264 | } | ||
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h index e43d7c5fe..26f101905 100644 --- a/src/consensus/ibf.h +++ b/src/consensus/ibf.h | |||
@@ -39,14 +39,52 @@ extern "C" | |||
39 | #endif | 39 | #endif |
40 | #endif | 40 | #endif |
41 | 41 | ||
42 | /** | ||
43 | * Size of one ibf bucket in bytes | ||
44 | */ | ||
45 | #define IBF_BUCKET_SIZE (64+64+1) | ||
46 | |||
42 | 47 | ||
43 | /** | 48 | /** |
44 | * Opaque handle to an invertible bloom filter (IBF). | 49 | * Invertible bloom filter (IBF). |
45 | * | 50 | * |
46 | * An IBF is a counting bloom filter that has the ability to restore | 51 | * An IBF is a counting bloom filter that has the ability to restore |
47 | * the hashes of its stored elements with high probability. | 52 | * the hashes of its stored elements with high probability. |
48 | */ | 53 | */ |
49 | struct InvertibleBloomFilter; | 54 | struct InvertibleBloomFilter |
55 | { | ||
56 | /** | ||
57 | * How many cells does this IBF have? | ||
58 | */ | ||
59 | unsigned int size; | ||
60 | |||
61 | /** | ||
62 | * In how many cells do we hash one element? | ||
63 | * Usually 4 or 3. | ||
64 | */ | ||
65 | unsigned int hash_num; | ||
66 | |||
67 | /** | ||
68 | * Salt for mingling hashes | ||
69 | */ | ||
70 | uint32_t salt; | ||
71 | |||
72 | /** | ||
73 | * xor sums of the elements' hash codes, used to identify the elements. | ||
74 | */ | ||
75 | struct GNUNET_HashCode *id_sum; | ||
76 | |||
77 | /** | ||
78 | * xor sums of the "hash of the hash". | ||
79 | */ | ||
80 | struct GNUNET_HashCode *hash_sum; | ||
81 | |||
82 | /** | ||
83 | * How many times has a bucket been hit? | ||
84 | * Can be negative, as a result of IBF subtraction. | ||
85 | */ | ||
86 | int8_t *count; | ||
87 | }; | ||
50 | 88 | ||
51 | 89 | ||
52 | /** | 90 | /** |
@@ -106,15 +144,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct GNUNET_HashCode | |||
106 | struct InvertibleBloomFilter * | 144 | struct InvertibleBloomFilter * |
107 | ibf_dup (struct InvertibleBloomFilter *ibf); | 145 | ibf_dup (struct InvertibleBloomFilter *ibf); |
108 | 146 | ||
109 | |||
110 | /* | ||
111 | ibf_hton (); | ||
112 | |||
113 | ibf_ntoh (); | ||
114 | |||
115 | ibf_get_nbo_size (); | ||
116 | */ | ||
117 | |||
118 | /** | 147 | /** |
119 | * Destroy all resources associated with the invertible bloom filter. | 148 | * Destroy all resources associated with the invertible bloom filter. |
120 | * No more ibf_*-functions may be called on ibf after calling destroy. | 149 | * No more ibf_*-functions may be called on ibf after calling destroy. |
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 86dfadb9b..01266c2a9 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf | |||
@@ -1,6 +1,6 @@ | |||
1 | [consensus] | 1 | [consensus] |
2 | AUTOSTART = YES | 2 | AUTOSTART = YES |
3 | PORT = 2103 | 3 | PORT = 2110 |
4 | HOSTNAME = localhost | 4 | HOSTNAME = localhost |
5 | HOME = $SERVICEHOME | 5 | HOME = $SERVICEHOME |
6 | BINARY = gnunet-service-consensus | 6 | BINARY = gnunet-service-consensus |
diff --git a/src/consensus/test_consensus_api.c b/src/consensus/test_consensus_api.c index e752983c2..c79a6221c 100644 --- a/src/consensus/test_consensus_api.c +++ b/src/consensus/test_consensus_api.c | |||
@@ -17,6 +17,7 @@ | |||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | 17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | |||
20 | /** | 21 | /** |
21 | * @file consensus/test_consensus_api.c | 22 | * @file consensus/test_consensus_api.c |
22 | * @brief testcase for consensus_api.c | 23 | * @brief testcase for consensus_api.c |
@@ -29,18 +30,20 @@ | |||
29 | 30 | ||
30 | static struct GNUNET_CONSENSUS_Handle *consensus; | 31 | static struct GNUNET_CONSENSUS_Handle *consensus; |
31 | 32 | ||
32 | static int insert; | ||
33 | |||
34 | static struct GNUNET_HashCode session_id; | 33 | static struct GNUNET_HashCode session_id; |
35 | 34 | ||
36 | 35 | ||
37 | static void conclude_done (void *cls, | 36 | static int |
38 | unsigned int num_peers_in_consensus, | 37 | conclude_done (void *cls, const struct GNUNET_CONSENSUS_Group *group) |
39 | const struct GNUNET_PeerIdentity *peers_in_consensus) | ||
40 | { | 38 | { |
41 | struct GNUNET_CONSENSUS_Handle *consensus; | 39 | if (NULL == group) |
42 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | 40 | { |
43 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n"); | 41 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude over\n"); |
42 | GNUNET_SCHEDULER_shutdown (); | ||
43 | return GNUNET_NO; | ||
44 | } | ||
45 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "concluded\n"); | ||
46 | return GNUNET_YES; | ||
44 | } | 47 | } |
45 | 48 | ||
46 | static int | 49 | static int |
@@ -54,7 +57,30 @@ on_new_element (void *cls, | |||
54 | static void | 57 | static void |
55 | insert_done (void *cls, int success) | 58 | insert_done (void *cls, int success) |
56 | { | 59 | { |
60 | /* make sure cb is only called once */ | ||
61 | static int called = GNUNET_NO; | ||
62 | GNUNET_assert (GNUNET_NO == called); | ||
63 | called = GNUNET_YES; | ||
57 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "insert done\n"); | 64 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "insert done\n"); |
65 | GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_SECONDS, 0, &conclude_done, NULL); | ||
66 | } | ||
67 | |||
68 | |||
69 | /** | ||
70 | * Signature of the main function of a task. | ||
71 | * | ||
72 | * @param cls closure | ||
73 | * @param tc context information (why was this task triggered now) | ||
74 | */ | ||
75 | static void | ||
76 | on_shutdown (void *cls, | ||
77 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
78 | { | ||
79 | if (NULL != consensus) | ||
80 | { | ||
81 | GNUNET_CONSENSUS_destroy (consensus); | ||
82 | consensus = NULL; | ||
83 | } | ||
58 | } | 84 | } |
59 | 85 | ||
60 | 86 | ||
@@ -69,18 +95,19 @@ run (void *cls, | |||
69 | struct GNUNET_CONSENSUS_Element el2 = {"bar", 4, 0}; | 95 | struct GNUNET_CONSENSUS_Element el2 = {"bar", 4, 0}; |
70 | 96 | ||
71 | GNUNET_log_setup ("test_consensus_api", | 97 | GNUNET_log_setup ("test_consensus_api", |
72 | "DEBUG", | 98 | "INFO", |
73 | NULL); | 99 | NULL); |
74 | 100 | ||
75 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n"); | 101 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n"); |
76 | 102 | ||
103 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &on_shutdown, NULL); | ||
104 | |||
77 | GNUNET_CRYPTO_hash (str, strlen (str), &session_id); | 105 | GNUNET_CRYPTO_hash (str, strlen (str), &session_id); |
78 | consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus); | 106 | consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus); |
79 | GNUNET_assert (consensus != NULL); | 107 | GNUNET_assert (consensus != NULL); |
80 | /* | 108 | |
81 | GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1); | 109 | GNUNET_CONSENSUS_insert (consensus, &el1, NULL, &consensus); |
82 | GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2); | 110 | GNUNET_CONSENSUS_insert (consensus, &el2, &insert_done, &consensus); |
83 | */ | ||
84 | } | 111 | } |
85 | 112 | ||
86 | 113 | ||