diff options
Diffstat (limited to 'src')
26 files changed, 723 insertions, 3320 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index 82af29c87..a0edb1d65 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am | |||
@@ -17,8 +17,7 @@ endif | |||
17 | 17 | ||
18 | bin_PROGRAMS = \ | 18 | bin_PROGRAMS = \ |
19 | gnunet-consensus \ | 19 | gnunet-consensus \ |
20 | gnunet-consensus-start-peers \ | 20 | gnunet-consensus-start-peers |
21 | gnunet-consensus-ibf | ||
22 | 21 | ||
23 | libexec_PROGRAMS = \ | 22 | libexec_PROGRAMS = \ |
24 | gnunet-service-consensus | 23 | gnunet-service-consensus |
@@ -52,33 +51,25 @@ gnunet_consensus_start_peers_LDADD = \ | |||
52 | gnunet_consensus_start_peers_DEPENDENCIES = \ | 51 | gnunet_consensus_start_peers_DEPENDENCIES = \ |
53 | libgnunetconsensus.la | 52 | libgnunetconsensus.la |
54 | 53 | ||
55 | gnunet_consensus_ibf_SOURCES = \ | ||
56 | gnunet-consensus-ibf.c \ | ||
57 | ibf.c | ||
58 | gnunet_consensus_ibf_LDADD = \ | ||
59 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
60 | $(GN_LIBINTL) | ||
61 | 54 | ||
62 | gnunet_service_consensus_SOURCES = \ | 55 | gnunet_service_consensus_SOURCES = \ |
63 | gnunet-service-consensus.c \ | 56 | gnunet-service-consensus.c |
64 | ibf.c \ | ||
65 | strata_estimator.c | ||
66 | gnunet_service_consensus_LDADD = \ | 57 | gnunet_service_consensus_LDADD = \ |
67 | $(top_builddir)/src/util/libgnunetutil.la \ | 58 | $(top_builddir)/src/util/libgnunetutil.la \ |
68 | $(top_builddir)/src/core/libgnunetcore.la \ | 59 | $(top_builddir)/src/core/libgnunetcore.la \ |
69 | $(top_builddir)/src/stream/libgnunetstream.la \ | 60 | $(top_builddir)/src/stream/libgnunetstream.la \ |
70 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 61 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
62 | $(top_builddir)/src/set/libgnunetset.la \ | ||
71 | $(GN_LIBINTL) | 63 | $(GN_LIBINTL) |
72 | 64 | ||
73 | gnunet_service_evil_consensus_SOURCES = \ | 65 | gnunet_service_evil_consensus_SOURCES = \ |
74 | gnunet-service-consensus.c \ | 66 | gnunet-service-consensus.c |
75 | ibf.c \ | ||
76 | strata_estimator.c | ||
77 | gnunet_service_evil_consensus_LDADD = \ | 67 | gnunet_service_evil_consensus_LDADD = \ |
78 | $(top_builddir)/src/util/libgnunetutil.la \ | 68 | $(top_builddir)/src/util/libgnunetutil.la \ |
79 | $(top_builddir)/src/core/libgnunetcore.la \ | 69 | $(top_builddir)/src/core/libgnunetcore.la \ |
80 | $(top_builddir)/src/stream/libgnunetstream.la \ | 70 | $(top_builddir)/src/stream/libgnunetstream.la \ |
81 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 71 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
72 | $(top_builddir)/src/set/libgnunetset.la \ | ||
82 | $(GN_LIBINTL) | 73 | $(GN_LIBINTL) |
83 | gnunet_service_evil_consensus_CFLAGS = -DEVIL | 74 | gnunet_service_evil_consensus_CFLAGS = -DEVIL |
84 | 75 | ||
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h index 06a66caed..e3c84e6fe 100644 --- a/src/consensus/consensus.h +++ b/src/consensus/consensus.h | |||
@@ -30,6 +30,10 @@ | |||
30 | 30 | ||
31 | GNUNET_NETWORK_STRUCT_BEGIN | 31 | GNUNET_NETWORK_STRUCT_BEGIN |
32 | 32 | ||
33 | /** | ||
34 | * Sent by the client to the service, | ||
35 | * when the client wants the service to join a consensus session. | ||
36 | */ | ||
33 | struct GNUNET_CONSENSUS_JoinMessage | 37 | struct GNUNET_CONSENSUS_JoinMessage |
34 | { | 38 | { |
35 | /** | 39 | /** |
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index 32c3d8b09..af4d74100 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h | |||
@@ -35,54 +35,17 @@ | |||
35 | 35 | ||
36 | GNUNET_NETWORK_STRUCT_BEGIN | 36 | GNUNET_NETWORK_STRUCT_BEGIN |
37 | 37 | ||
38 | struct StrataMessage | 38 | /** |
39 | { | 39 | * Sent as context message for set reconciliation. |
40 | struct GNUNET_MessageHeader header; | 40 | */ |
41 | uint8_t round; | 41 | struct ConsensusRoundMessage |
42 | uint8_t exp_round; | ||
43 | uint8_t exp_subround; | ||
44 | /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */ | ||
45 | /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */ | ||
46 | /* uint8_t count_buckets[ibf_size*num_strata] */ | ||
47 | }; | ||
48 | |||
49 | struct DifferenceDigest | ||
50 | { | 42 | { |
51 | struct GNUNET_MessageHeader header; | 43 | struct GNUNET_MessageHeader header; |
52 | uint8_t order; | ||
53 | uint8_t round; | 44 | uint8_t round; |
54 | uint8_t exp_round; | 45 | uint8_t exp_round; |
55 | uint8_t exp_subround; | 46 | uint8_t exp_subround; |
56 | /* rest: IBF */ | ||
57 | }; | ||
58 | |||
59 | |||
60 | struct Element | ||
61 | { | ||
62 | struct GNUNET_MessageHeader header; | ||
63 | struct GNUNET_HashCode hash; | ||
64 | }; | 47 | }; |
65 | 48 | ||
66 | |||
67 | struct ElementRequest | ||
68 | { | ||
69 | struct GNUNET_MessageHeader header; | ||
70 | /* struct GNUNET_HashCode[] rest */ | ||
71 | }; | ||
72 | |||
73 | struct ConsensusHello | ||
74 | { | ||
75 | struct GNUNET_MessageHeader header; | ||
76 | struct GNUNET_HashCode global_id; | ||
77 | }; | ||
78 | |||
79 | struct ConsensusRoundMessage | ||
80 | { | ||
81 | struct GNUNET_MessageHeader header; | ||
82 | uint8_t round; | ||
83 | }; | ||
84 | |||
85 | |||
86 | GNUNET_NETWORK_STRUCT_END | 49 | GNUNET_NETWORK_STRUCT_END |
87 | 50 | ||
88 | #endif | 51 | #endif |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index a7640c51f..44edeb215 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -29,14 +29,10 @@ | |||
29 | #include "gnunet_protocols.h" | 29 | #include "gnunet_protocols.h" |
30 | #include "gnunet_applications.h" | 30 | #include "gnunet_applications.h" |
31 | #include "gnunet_util_lib.h" | 31 | #include "gnunet_util_lib.h" |
32 | #include "gnunet_set_service.h" | ||
32 | #include "gnunet_consensus_service.h" | 33 | #include "gnunet_consensus_service.h" |
33 | #include "gnunet_core_service.h" | ||
34 | #include "gnunet_stream_lib.h" | ||
35 | |||
36 | #include "consensus_protocol.h" | 34 | #include "consensus_protocol.h" |
37 | #include "consensus.h" | 35 | #include "consensus.h" |
38 | #include "ibf.h" | ||
39 | #include "strata_estimator.h" | ||
40 | 36 | ||
41 | 37 | ||
42 | /* | 38 | /* |
@@ -47,82 +43,19 @@ | |||
47 | 43 | ||
48 | 44 | ||
49 | /** | 45 | /** |
50 | * Number of IBFs in a strata estimator. | ||
51 | */ | ||
52 | #define SE_STRATA_COUNT 32 | ||
53 | /** | ||
54 | * Size of the IBFs in the strata estimator. | ||
55 | */ | ||
56 | #define SE_IBF_SIZE 80 | ||
57 | /** | ||
58 | * hash num parameter for the difference digests and strata estimators | ||
59 | */ | ||
60 | #define SE_IBF_HASH_NUM 3 | ||
61 | |||
62 | /** | ||
63 | * Number of buckets that can be transmitted in one message. | ||
64 | */ | ||
65 | #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) | ||
66 | |||
67 | /** | ||
68 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | ||
69 | * Choose this value so that computing the IBF is still cheaper | ||
70 | * than transmitting all values. | ||
71 | */ | ||
72 | #define MAX_IBF_ORDER (16) | ||
73 | |||
74 | /** | ||
75 | * Number of exponential rounds, used in the inventory and completion round. | 46 | * Number of exponential rounds, used in the inventory and completion round. |
76 | */ | 47 | */ |
77 | #define NUM_EXP_ROUNDS (4) | 48 | #define NUM_EXP_ROUNDS (4) |
78 | 49 | ||
79 | |||
80 | /* forward declarations */ | 50 | /* forward declarations */ |
81 | 51 | ||
82 | /* mutual recursion with struct ConsensusSession */ | 52 | /* mutual recursion with struct ConsensusSession */ |
83 | struct ConsensusPeerInformation; | 53 | struct ConsensusPeerInformation; |
84 | 54 | ||
85 | struct MessageQueue; | ||
86 | |||
87 | /* mutual recursion with round_over */ | 55 | /* mutual recursion with round_over */ |
88 | static void | 56 | static void |
89 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 57 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); |
90 | 58 | ||
91 | /* mutial recursion with transmit_queued */ | ||
92 | static void | ||
93 | client_send_next (struct MessageQueue *mq); | ||
94 | |||
95 | /* mutual recursion with mst_session_callback */ | ||
96 | static void | ||
97 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket); | ||
98 | |||
99 | static int | ||
100 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message); | ||
101 | |||
102 | |||
103 | /** | ||
104 | * Additional information about a consensus element. | ||
105 | */ | ||
106 | struct ElementInfo | ||
107 | { | ||
108 | /** | ||
109 | * The element itself. | ||
110 | */ | ||
111 | struct GNUNET_CONSENSUS_Element *element; | ||
112 | /** | ||
113 | * Hash of the element | ||
114 | */ | ||
115 | struct GNUNET_HashCode *element_hash; | ||
116 | /** | ||
117 | * Number of other peers that have the element in the inventory. | ||
118 | */ | ||
119 | unsigned int inventory_count; | ||
120 | /** | ||
121 | * Bitmap of peers that have this element in their inventory | ||
122 | */ | ||
123 | uint8_t *inventory_bitmap; | ||
124 | }; | ||
125 | |||
126 | 59 | ||
127 | /** | 60 | /** |
128 | * Describes the current round a consensus session is in. | 61 | * Describes the current round a consensus session is in. |
@@ -138,7 +71,8 @@ enum ConsensusRound | |||
138 | */ | 71 | */ |
139 | CONSENSUS_ROUND_EXCHANGE, | 72 | CONSENSUS_ROUND_EXCHANGE, |
140 | /** | 73 | /** |
141 | * Exchange which elements each peer has, but not the elements. | 74 | * Exchange which elements each peer has, but don't |
75 | * transmit the element's data, only their SHA-512 hashes. | ||
142 | * This round uses the all-to-all scheme. | 76 | * This round uses the all-to-all scheme. |
143 | */ | 77 | */ |
144 | CONSENSUS_ROUND_INVENTORY, | 78 | CONSENSUS_ROUND_INVENTORY, |
@@ -153,82 +87,6 @@ enum ConsensusRound | |||
153 | CONSENSUS_ROUND_FINISH | 87 | CONSENSUS_ROUND_FINISH |
154 | }; | 88 | }; |
155 | 89 | ||
156 | /* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */ | ||
157 | |||
158 | /** | ||
159 | * State of another peer with respect to the | ||
160 | * current ibf. | ||
161 | */ | ||
162 | enum ConsensusIBFState { | ||
163 | /** | ||
164 | * There is nothing going on with the IBF. | ||
165 | */ | ||
166 | IBF_STATE_NONE=0, | ||
167 | /** | ||
168 | * We currently receive an ibf. | ||
169 | */ | ||
170 | IBF_STATE_RECEIVING, | ||
171 | /* | ||
172 | * we decode a received ibf | ||
173 | */ | ||
174 | IBF_STATE_DECODING, | ||
175 | /** | ||
176 | * wait for elements and element requests | ||
177 | */ | ||
178 | IBF_STATE_ANTICIPATE_DIFF | ||
179 | }; | ||
180 | |||
181 | |||
182 | typedef void (*AddCallback) (struct MessageQueue *mq); | ||
183 | typedef void (*MessageSentCallback) (void *cls); | ||
184 | |||
185 | |||
186 | /** | ||
187 | * Collection of the state necessary to read and write gnunet messages | ||
188 | * to a stream socket. Should be used as closure for stream_data_processor. | ||
189 | */ | ||
190 | struct MessageStreamState | ||
191 | { | ||
192 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
193 | struct MessageQueue *mq; | ||
194 | void *mst_cls; | ||
195 | struct GNUNET_STREAM_Socket *socket; | ||
196 | struct GNUNET_STREAM_ReadHandle *rh; | ||
197 | struct GNUNET_STREAM_WriteHandle *wh; | ||
198 | }; | ||
199 | |||
200 | |||
201 | struct ServerClientSocketState | ||
202 | { | ||
203 | struct GNUNET_SERVER_Client *client; | ||
204 | struct GNUNET_SERVER_TransmitHandle* th; | ||
205 | }; | ||
206 | |||
207 | |||
208 | /** | ||
209 | * Generic message queue, for queueing outgoing messages. | ||
210 | */ | ||
211 | struct MessageQueue | ||
212 | { | ||
213 | void *state; | ||
214 | AddCallback add_cb; | ||
215 | struct PendingMessage *pending_head; | ||
216 | struct PendingMessage *pending_tail; | ||
217 | struct PendingMessage *current_pm; | ||
218 | }; | ||
219 | |||
220 | |||
221 | struct PendingMessage | ||
222 | { | ||
223 | struct GNUNET_MessageHeader *msg; | ||
224 | struct MessageQueue *parent_queue; | ||
225 | struct PendingMessage *next; | ||
226 | struct PendingMessage *prev; | ||
227 | MessageSentCallback sent_cb; | ||
228 | void *sent_cb_cls; | ||
229 | }; | ||
230 | |||
231 | |||
232 | /** | 90 | /** |
233 | * A consensus session consists of one local client and the remote authorities. | 91 | * A consensus session consists of one local client and the remote authorities. |
234 | */ | 92 | */ |
@@ -245,58 +103,35 @@ struct ConsensusSession | |||
245 | struct ConsensusSession *prev; | 103 | struct ConsensusSession *prev; |
246 | 104 | ||
247 | /** | 105 | /** |
248 | * Join message. Used to initialize the session later, | ||
249 | * if the identity of the local peer is not yet known. | ||
250 | * NULL if the session has been fully initialized. | ||
251 | */ | ||
252 | struct GNUNET_CONSENSUS_JoinMessage *join_msg; | ||
253 | |||
254 | /** | ||
255 | * Global consensus identification, computed | 106 | * Global consensus identification, computed |
256 | * from the session id and participating authorities. | 107 | * from the session id and participating authorities. |
257 | */ | 108 | */ |
258 | struct GNUNET_HashCode global_id; | 109 | struct GNUNET_HashCode global_id; |
259 | 110 | ||
260 | /** | 111 | /** |
261 | * The server's client and associated local state | 112 | * Client that inhabits the session |
262 | */ | 113 | */ |
263 | struct ServerClientSocketState scss; | 114 | struct GNUNET_SERVER_Client *client; |
264 | 115 | ||
265 | /** | 116 | /** |
266 | * Queued messages to the client. | 117 | * Queued messages to the client. |
267 | */ | 118 | */ |
268 | struct MessageQueue *client_mq; | 119 | struct GNUNET_MQ_MessageQueue *client_mq; |
269 | |||
270 | /** | ||
271 | * IBF_Key -> 2^(HashCode*) | ||
272 | * FIXME: | ||
273 | * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *. | ||
274 | */ | ||
275 | struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map; | ||
276 | |||
277 | /** | ||
278 | * Maps HashCodes to ElementInfos | ||
279 | */ | ||
280 | struct GNUNET_CONTAINER_MultiHashMap *values; | ||
281 | |||
282 | /** | ||
283 | * Currently active transmit handle for sending to the client | ||
284 | */ | ||
285 | struct GNUNET_SERVER_TransmitHandle *client_th; | ||
286 | 120 | ||
287 | /** | 121 | /** |
288 | * Timeout for all rounds together, single rounds will schedule a timeout task | 122 | * Timeout for all rounds together, single rounds will schedule a timeout task |
289 | * with a fraction of the conclude timeout. | 123 | * with a fraction of the conclude timeout. |
124 | * Only valid once the current round is not CONSENSUS_ROUND_BEGIN. | ||
290 | */ | 125 | */ |
291 | struct GNUNET_TIME_Relative conclude_timeout; | 126 | struct GNUNET_TIME_Relative conclude_timeout; |
292 | 127 | ||
293 | /** | 128 | /** |
294 | * Timeout task identifier for the current round | 129 | * Timeout task identifier for the current round. |
295 | */ | 130 | */ |
296 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; | 131 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; |
297 | 132 | ||
298 | /** | 133 | /** |
299 | * Number of other peers in the consensus | 134 | * Number of other peers in the consensus. |
300 | */ | 135 | */ |
301 | unsigned int num_peers; | 136 | unsigned int num_peers; |
302 | 137 | ||
@@ -307,26 +142,11 @@ struct ConsensusSession | |||
307 | struct ConsensusPeerInformation *info; | 142 | struct ConsensusPeerInformation *info; |
308 | 143 | ||
309 | /** | 144 | /** |
310 | * GNUNET_YES if the client has called conclude. | ||
311 | * */ | ||
312 | int conclude; | ||
313 | |||
314 | /** | ||
315 | * Index of the local peer in the peers array | 145 | * Index of the local peer in the peers array |
316 | */ | 146 | */ |
317 | unsigned int local_peer_idx; | 147 | unsigned int local_peer_idx; |
318 | 148 | ||
319 | /** | 149 | /** |
320 | * Strata estimator, computed online | ||
321 | */ | ||
322 | struct StrataEstimator *se; | ||
323 | |||
324 | /** | ||
325 | * Pre-computed IBFs | ||
326 | */ | ||
327 | struct InvertibleBloomFilter **ibfs; | ||
328 | |||
329 | /** | ||
330 | * Current round | 150 | * Current round |
331 | */ | 151 | */ |
332 | enum ConsensusRound current_round; | 152 | enum ConsensusRound current_round; |
@@ -337,19 +157,36 @@ struct ConsensusSession | |||
337 | */ | 157 | */ |
338 | int *shuffle; | 158 | int *shuffle; |
339 | 159 | ||
160 | /** | ||
161 | * Current round of the exponential scheme. | ||
162 | */ | ||
340 | int exp_round; | 163 | int exp_round; |
341 | 164 | ||
165 | /** | ||
166 | * Current sub-round of the exponential scheme. | ||
167 | */ | ||
342 | int exp_subround; | 168 | int exp_subround; |
343 | 169 | ||
344 | /** | 170 | /** |
345 | * The partner for the current exp-round | 171 | * The partner for the current exp-round |
346 | */ | 172 | */ |
347 | struct ConsensusPeerInformation* partner_outgoing; | 173 | struct ConsensusPeerInformation *partner_outgoing; |
348 | 174 | ||
349 | /** | 175 | /** |
350 | * The partner for the current exp-round | 176 | * The partner for the current exp-round |
351 | */ | 177 | */ |
352 | struct ConsensusPeerInformation* partner_incoming; | 178 | struct ConsensusPeerInformation *partner_incoming; |
179 | |||
180 | /** | ||
181 | * The consensus set of this session. | ||
182 | */ | ||
183 | struct GNUNET_SET_Handle *element_set; | ||
184 | |||
185 | /** | ||
186 | * Listener for requests from other peers. | ||
187 | * Uses the session's global id as app id. | ||
188 | */ | ||
189 | struct GNUNET_SET_ListenHandle *set_listener; | ||
353 | }; | 190 | }; |
354 | 191 | ||
355 | 192 | ||
@@ -374,41 +211,6 @@ struct ConsensusPeerInformation | |||
374 | */ | 211 | */ |
375 | int hello; | 212 | int hello; |
376 | 213 | ||
377 | /* | ||
378 | * FIXME | ||
379 | */ | ||
380 | struct MessageStreamState mss; | ||
381 | |||
382 | /** | ||
383 | * Current state | ||
384 | */ | ||
385 | enum ConsensusIBFState ibf_state; | ||
386 | |||
387 | /** | ||
388 | * What is the order (=log2 size) of the ibf | ||
389 | * we're currently dealing with? | ||
390 | * Interpretation depends on ibf_state. | ||
391 | */ | ||
392 | int ibf_order; | ||
393 | |||
394 | /** | ||
395 | * The current IBF for this peer, | ||
396 | * purpose dependent on ibf_state | ||
397 | */ | ||
398 | struct InvertibleBloomFilter *ibf; | ||
399 | |||
400 | /** | ||
401 | * How many buckets have we transmitted/received? | ||
402 | * Interpretatin depends on ibf_state | ||
403 | */ | ||
404 | int ibf_bucket_counter; | ||
405 | |||
406 | /** | ||
407 | * Strata estimator of the peer, NULL if our peer | ||
408 | * initiated the reconciliation. | ||
409 | */ | ||
410 | struct StrataEstimator *se; | ||
411 | |||
412 | /** | 214 | /** |
413 | * Back-reference to the consensus session, | 215 | * Back-reference to the consensus session, |
414 | * to that ConsensusPeerInformation can be used as a closure | 216 | * to that ConsensusPeerInformation can be used as a closure |
@@ -416,18 +218,6 @@ struct ConsensusPeerInformation | |||
416 | struct ConsensusSession *session; | 218 | struct ConsensusSession *session; |
417 | 219 | ||
418 | /** | 220 | /** |
419 | * True if we are actually replaying the strata message, | ||
420 | * e.g. currently handling the premature_strata_message. | ||
421 | */ | ||
422 | int replaying_strata_message; | ||
423 | |||
424 | /** | ||
425 | * A strata message that is not actually for the current round, | ||
426 | * used in the exp-scheme. | ||
427 | */ | ||
428 | struct StrataMessage *premature_strata_message; | ||
429 | |||
430 | /** | ||
431 | * We have finishes the exp-subround with the peer. | 221 | * We have finishes the exp-subround with the peer. |
432 | */ | 222 | */ |
433 | int exp_subround_finished; | 223 | int exp_subround_finished; |
@@ -444,65 +234,15 @@ struct ConsensusPeerInformation | |||
444 | * older round, while we are already in the next round. | 234 | * older round, while we are already in the next round. |
445 | */ | 235 | */ |
446 | enum ConsensusRound apparent_round; | 236 | enum ConsensusRound apparent_round; |
447 | }; | ||
448 | |||
449 | |||
450 | /** | ||
451 | * Sockets from other peers who want to communicate with us. | ||
452 | * It may not be known yet which consensus session they belong to, we have to wait for the | ||
453 | * peer's hello. | ||
454 | * Also, the session might not exist yet locally, we have to wait for a local client to connect. | ||
455 | */ | ||
456 | struct IncomingSocket | ||
457 | { | ||
458 | /** | ||
459 | * Incoming sockets are kept in a double linked list. | ||
460 | */ | ||
461 | struct IncomingSocket *next; | ||
462 | |||
463 | /** | ||
464 | * Incoming sockets are kept in a double linked list. | ||
465 | */ | ||
466 | struct IncomingSocket *prev; | ||
467 | |||
468 | /** | ||
469 | * Peer that connected to us with the socket. | ||
470 | */ | ||
471 | struct GNUNET_PeerIdentity peer_id; | ||
472 | 237 | ||
473 | /** | 238 | /** |
474 | * Peer-in-session this socket belongs to, once known, otherwise NULL. | 239 | * Set operation we are currently executing with this peer. |
475 | */ | 240 | */ |
476 | struct ConsensusPeerInformation *cpi; | 241 | struct GNUNET_SET_OperationHandle *set_op; |
477 | |||
478 | /** | ||
479 | * Set to the global session id, if the peer sent us a hello-message, | ||
480 | * but the session does not exist yet. | ||
481 | */ | ||
482 | struct GNUNET_HashCode *requested_gid; | ||
483 | |||
484 | /* | ||
485 | * Timeout, will disconnect the socket if not yet in a session. | ||
486 | * FIXME: implement | ||
487 | */ | ||
488 | GNUNET_SCHEDULER_TaskIdentifier timeout; | ||
489 | |||
490 | /* FIXME */ | ||
491 | struct MessageStreamState mss; | ||
492 | }; | 242 | }; |
493 | 243 | ||
494 | 244 | ||
495 | /** | 245 | /** |
496 | * Linked list of incoming sockets. | ||
497 | */ | ||
498 | static struct IncomingSocket *incoming_sockets_head; | ||
499 | |||
500 | /** | ||
501 | * Linked list of incoming sockets. | ||
502 | */ | ||
503 | static struct IncomingSocket *incoming_sockets_tail; | ||
504 | |||
505 | /** | ||
506 | * Linked list of sessions this peer participates in. | 246 | * Linked list of sessions this peer participates in. |
507 | */ | 247 | */ |
508 | static struct ConsensusSession *sessions_head; | 248 | static struct ConsensusSession *sessions_head; |
@@ -525,297 +265,10 @@ static struct GNUNET_SERVER_Handle *srv; | |||
525 | /** | 265 | /** |
526 | * Peer that runs this service. | 266 | * Peer that runs this service. |
527 | */ | 267 | */ |
528 | static struct GNUNET_PeerIdentity *my_peer; | 268 | static struct GNUNET_PeerIdentity my_peer; |
529 | |||
530 | /** | ||
531 | * Handle to the core service. Only used during service startup, will be NULL after that. | ||
532 | */ | ||
533 | static struct GNUNET_CORE_Handle *core; | ||
534 | |||
535 | /** | ||
536 | * Listener for sockets from peers that want to reconcile with us. | ||
537 | */ | ||
538 | static struct GNUNET_STREAM_ListenSocket *listener; | ||
539 | |||
540 | |||
541 | /** | ||
542 | * Transmit a queued message to the session's client. | ||
543 | * | ||
544 | * @param cls consensus session | ||
545 | * @param size number of bytes available in buf | ||
546 | * @param buf where the callee should write the message | ||
547 | * @return number of bytes written to buf | ||
548 | */ | ||
549 | static size_t | ||
550 | transmit_queued (void *cls, size_t size, | ||
551 | void *buf) | ||
552 | { | ||
553 | struct MessageQueue *mq = cls; | ||
554 | struct PendingMessage *pm = mq->pending_head; | ||
555 | struct ServerClientSocketState *state = mq->state; | ||
556 | size_t msg_size; | ||
557 | |||
558 | GNUNET_assert (NULL != pm); | ||
559 | GNUNET_assert (NULL != buf); | ||
560 | msg_size = ntohs (pm->msg->size); | ||
561 | GNUNET_assert (size >= msg_size); | ||
562 | memcpy (buf, pm->msg, msg_size); | ||
563 | GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); | ||
564 | state->th = NULL; | ||
565 | client_send_next (cls); | ||
566 | GNUNET_free (pm); | ||
567 | return msg_size; | ||
568 | } | ||
569 | |||
570 | |||
571 | static void | ||
572 | client_send_next (struct MessageQueue *mq) | ||
573 | { | ||
574 | struct ServerClientSocketState *state = mq->state; | ||
575 | int msize; | ||
576 | |||
577 | GNUNET_assert (NULL != state); | ||
578 | |||
579 | if ( (NULL != state->th) || | ||
580 | (NULL == mq->pending_head) ) | ||
581 | return; | ||
582 | msize = ntohs (mq->pending_head->msg->size); | ||
583 | state->th = | ||
584 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, | ||
585 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
586 | &transmit_queued, mq); | ||
587 | } | ||
588 | |||
589 | |||
590 | struct MessageQueue * | ||
591 | create_message_queue_for_server_client (struct ServerClientSocketState *scss) | ||
592 | { | ||
593 | struct MessageQueue *mq; | ||
594 | mq = GNUNET_new (struct MessageQueue); | ||
595 | mq->add_cb = client_send_next; | ||
596 | mq->state = scss; | ||
597 | return mq; | ||
598 | } | ||
599 | |||
600 | |||
601 | /** | ||
602 | * Functions of this signature are called whenever writing operations | ||
603 | * on a stream are executed | ||
604 | * | ||
605 | * @param cls the closure from GNUNET_STREAM_write | ||
606 | * @param status the status of the stream at the time this function is called; | ||
607 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
608 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
609 | * (this doesn't mean that the data is never sent, the receiver may | ||
610 | * have read the data but its ACKs may have been lost); | ||
611 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
612 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
613 | * be processed. | ||
614 | * @param size the number of bytes written | ||
615 | */ | ||
616 | static void | ||
617 | write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
618 | { | ||
619 | struct MessageQueue *mq = cls; | ||
620 | struct MessageStreamState *mss = mq->state; | ||
621 | struct PendingMessage *pm; | ||
622 | |||
623 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
624 | |||
625 | /* call cb for message we finished sending */ | ||
626 | pm = mq->current_pm; | ||
627 | if (NULL != pm) | ||
628 | { | ||
629 | if (NULL != pm->sent_cb) | ||
630 | pm->sent_cb (pm->sent_cb_cls); | ||
631 | GNUNET_free (pm); | ||
632 | } | ||
633 | |||
634 | mss->wh = NULL; | ||
635 | |||
636 | pm = mq->pending_head; | ||
637 | mq->current_pm = pm; | ||
638 | if (NULL == pm) | ||
639 | return; | ||
640 | GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); | ||
641 | mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size), | ||
642 | GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls); | ||
643 | GNUNET_assert (NULL != mss->wh); | ||
644 | } | ||
645 | |||
646 | |||
647 | static void | ||
648 | stream_socket_add_cb (struct MessageQueue *mq) | ||
649 | { | ||
650 | if (NULL != mq->current_pm) | ||
651 | return; | ||
652 | write_queued (mq, GNUNET_STREAM_OK, 0); | ||
653 | } | ||
654 | |||
655 | |||
656 | struct MessageQueue * | ||
657 | create_message_queue_for_stream_socket (struct MessageStreamState *mss) | ||
658 | { | ||
659 | struct MessageQueue *mq; | ||
660 | mq = GNUNET_new (struct MessageQueue); | ||
661 | mq->state = mss; | ||
662 | mq->add_cb = stream_socket_add_cb; | ||
663 | return mq; | ||
664 | } | ||
665 | |||
666 | |||
667 | struct PendingMessage * | ||
668 | new_pending_message (uint16_t size, uint16_t type) | ||
669 | { | ||
670 | struct PendingMessage *pm; | ||
671 | pm = GNUNET_malloc (sizeof *pm + size); | ||
672 | pm->msg = (void *) &pm[1]; | ||
673 | pm->msg->size = htons (size); | ||
674 | pm->msg->type = htons (type); | ||
675 | return pm; | ||
676 | } | ||
677 | |||
678 | |||
679 | /** | ||
680 | * Queue a message in a message queue. | ||
681 | * | ||
682 | * @param queue the message queue | ||
683 | * @param pending message, message with additional information | ||
684 | */ | ||
685 | void | ||
686 | message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg) | ||
687 | { | ||
688 | GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg); | ||
689 | queue->add_cb (queue); | ||
690 | } | ||
691 | |||
692 | |||
693 | /** | ||
694 | * Called when we receive data from a peer via stream. | ||
695 | * | ||
696 | * @param cls the closure from GNUNET_STREAM_read | ||
697 | * @param status the status of the stream at the time this function is called | ||
698 | * @param data traffic from the other side | ||
699 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
700 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
701 | * given to the next time the read processor is called). | ||
702 | */ | ||
703 | static size_t | ||
704 | stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size) | ||
705 | { | ||
706 | struct MessageStreamState *mss = cls; | ||
707 | int ret; | ||
708 | |||
709 | mss->rh = NULL; | ||
710 | |||
711 | if (GNUNET_STREAM_OK != status) | ||
712 | { | ||
713 | /* FIXME: handle this correctly */ | ||
714 | GNUNET_break (0); | ||
715 | return 0; | ||
716 | } | ||
717 | GNUNET_assert (NULL != mss->mst); | ||
718 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES); | ||
719 | if (GNUNET_SYSERR == ret) | ||
720 | { | ||
721 | /* FIXME: handle this correctly */ | ||
722 | GNUNET_break (0); | ||
723 | return 0; | ||
724 | } | ||
725 | /* read again */ | ||
726 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss); | ||
727 | /* we always read all data */ | ||
728 | return size; | ||
729 | } | ||
730 | |||
731 | |||
732 | /** | ||
733 | * Send element or element report to the peer specified in cpi. | ||
734 | * | ||
735 | * @param cpi peer to send the elements to | ||
736 | * @param head head of the element list | ||
737 | */ | ||
738 | static void | ||
739 | send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e) | ||
740 | { | ||
741 | struct PendingMessage *pm; | ||
742 | |||
743 | switch (cpi->apparent_round) | ||
744 | { | ||
745 | case CONSENSUS_ROUND_COMPLETION: | ||
746 | case CONSENSUS_ROUND_EXCHANGE: | ||
747 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size, | ||
748 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | ||
749 | memcpy (&pm->msg[1], e->element->data, e->element->size); | ||
750 | message_queue_add (cpi->mss.mq, pm); | ||
751 | break; | ||
752 | case CONSENSUS_ROUND_INVENTORY: | ||
753 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode), | ||
754 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); | ||
755 | memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode)); | ||
756 | message_queue_add (cpi->mss.mq, pm); | ||
757 | break; | ||
758 | default: | ||
759 | GNUNET_break (0); | ||
760 | } | ||
761 | } | ||
762 | |||
763 | |||
764 | /** | ||
765 | * Iterator to insert values into an ibf. | ||
766 | * | ||
767 | * @param cls closure | ||
768 | * @param key current key code | ||
769 | * @param value value in the hash map | ||
770 | * @return GNUNET_YES if we should continue to | ||
771 | * iterate, | ||
772 | * GNUNET_NO if not. | ||
773 | */ | ||
774 | static int | ||
775 | ibf_values_iterator (void *cls, | ||
776 | const struct GNUNET_HashCode *key, | ||
777 | void *value) | ||
778 | { | ||
779 | struct ConsensusPeerInformation *cpi = cls; | ||
780 | struct ElementInfo *e = value; | ||
781 | struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash); | ||
782 | |||
783 | GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); | ||
784 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); | ||
785 | return GNUNET_YES; | ||
786 | } | ||
787 | |||
788 | /** | ||
789 | * Create and populate an IBF for the specified peer, | ||
790 | * if it does not already exist. | ||
791 | * | ||
792 | * @param cpi peer to create the ibf for | ||
793 | */ | ||
794 | static void | ||
795 | prepare_ibf (struct ConsensusPeerInformation *cpi) | ||
796 | { | ||
797 | if (NULL != cpi->session->ibfs[cpi->ibf_order]) | ||
798 | return; | ||
799 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); | ||
800 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); | ||
801 | } | ||
802 | |||
803 | |||
804 | /** | ||
805 | * Called when a remote peer wants to inform the local peer | ||
806 | * that the remote peer misses elements. | ||
807 | * Elements are not reconciled. | ||
808 | * | ||
809 | * @param cpi session | ||
810 | * @param msg message | ||
811 | */ | ||
812 | static int | ||
813 | handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
814 | { | ||
815 | GNUNET_assert (0); | ||
816 | } | ||
817 | 269 | ||
818 | 270 | ||
271 | /* | ||
819 | static int | 272 | static int |
820 | exp_subround_finished (const struct ConsensusSession *session) | 273 | exp_subround_finished (const struct ConsensusSession *session) |
821 | { | 274 | { |
@@ -831,8 +284,11 @@ exp_subround_finished (const struct ConsensusSession *session) | |||
831 | return GNUNET_YES; | 284 | return GNUNET_YES; |
832 | return GNUNET_NO; | 285 | return GNUNET_NO; |
833 | } | 286 | } |
287 | */ | ||
288 | |||
834 | 289 | ||
835 | 290 | ||
291 | /* | ||
836 | static int | 292 | static int |
837 | inventory_round_finished (struct ConsensusSession *session) | 293 | inventory_round_finished (struct ConsensusSession *session) |
838 | { | 294 | { |
@@ -846,61 +302,7 @@ inventory_round_finished (struct ConsensusSession *session) | |||
846 | return GNUNET_YES; | 302 | return GNUNET_YES; |
847 | return GNUNET_NO; | 303 | return GNUNET_NO; |
848 | } | 304 | } |
849 | 305 | */ | |
850 | |||
851 | static void | ||
852 | clear_message_stream_state (struct MessageStreamState *mss) | ||
853 | { | ||
854 | if (NULL != mss->mst) | ||
855 | { | ||
856 | GNUNET_SERVER_mst_destroy (mss->mst); | ||
857 | mss->mst = NULL; | ||
858 | } | ||
859 | if (NULL != mss->rh) | ||
860 | { | ||
861 | GNUNET_STREAM_read_cancel (mss->rh); | ||
862 | mss->rh = NULL; | ||
863 | } | ||
864 | if (NULL != mss->wh) | ||
865 | { | ||
866 | GNUNET_STREAM_write_cancel (mss->wh); | ||
867 | mss->wh = NULL; | ||
868 | } | ||
869 | if (NULL != mss->socket) | ||
870 | { | ||
871 | GNUNET_STREAM_close (mss->socket); | ||
872 | mss->socket = NULL; | ||
873 | } | ||
874 | if (NULL != mss->mq) | ||
875 | { | ||
876 | GNUNET_free (mss->mq); | ||
877 | mss->mq = NULL; | ||
878 | } | ||
879 | } | ||
880 | |||
881 | |||
882 | /** | ||
883 | * Iterator over hash map entries. | ||
884 | * | ||
885 | * @param cls closure | ||
886 | * @param key current key code | ||
887 | * @param value value in the hash map | ||
888 | * @return GNUNET_YES if we should continue to | ||
889 | * iterate, | ||
890 | * GNUNET_NO if not. | ||
891 | */ | ||
892 | static int | ||
893 | destroy_element_info_iter (void *cls, | ||
894 | const struct GNUNET_HashCode * key, | ||
895 | void *value) | ||
896 | { | ||
897 | struct ElementInfo *ei = value; | ||
898 | GNUNET_free (ei->element); | ||
899 | GNUNET_free (ei->element_hash); | ||
900 | GNUNET_free (ei); | ||
901 | return GNUNET_YES; | ||
902 | } | ||
903 | |||
904 | 306 | ||
905 | /** | 307 | /** |
906 | * Destroy a session, free all resources associated with it. | 308 | * Destroy a session, free all resources associated with it. |
@@ -913,11 +315,9 @@ destroy_session (struct ConsensusSession *session) | |||
913 | int i; | 315 | int i; |
914 | 316 | ||
915 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); | 317 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); |
916 | GNUNET_SERVER_client_drop (session->scss.client); | ||
917 | session->scss.client = NULL; | ||
918 | if (NULL != session->client_mq) | 318 | if (NULL != session->client_mq) |
919 | { | 319 | { |
920 | GNUNET_free (session->client_mq); | 320 | GNUNET_MQ_destroy (session->client_mq); |
921 | session->client_mq = NULL; | 321 | session->client_mq = NULL; |
922 | } | 322 | } |
923 | if (NULL != session->shuffle) | 323 | if (NULL != session->shuffle) |
@@ -925,617 +325,21 @@ destroy_session (struct ConsensusSession *session) | |||
925 | GNUNET_free (session->shuffle); | 325 | GNUNET_free (session->shuffle); |
926 | session->shuffle = NULL; | 326 | session->shuffle = NULL; |
927 | } | 327 | } |
928 | if (NULL != session->se) | ||
929 | { | ||
930 | strata_estimator_destroy (session->se); | ||
931 | session->se = NULL; | ||
932 | } | ||
933 | if (NULL != session->info) | 328 | if (NULL != session->info) |
934 | { | 329 | { |
935 | for (i = 0; i < session->num_peers; i++) | 330 | for (i = 0; i < session->num_peers; i++) |
936 | { | 331 | { |
937 | struct ConsensusPeerInformation *cpi; | 332 | struct ConsensusPeerInformation *cpi; |
938 | cpi = &session->info[i]; | 333 | cpi = &session->info[i]; |
939 | clear_message_stream_state (&cpi->mss); | 334 | GNUNET_free (cpi); |
940 | if (NULL != cpi->se) | ||
941 | { | ||
942 | strata_estimator_destroy (cpi->se); | ||
943 | cpi->se = NULL; | ||
944 | } | ||
945 | if (NULL != cpi->ibf) | ||
946 | { | ||
947 | ibf_destroy (cpi->ibf); | ||
948 | cpi->ibf = NULL; | ||
949 | } | ||
950 | } | 335 | } |
951 | GNUNET_free (session->info); | 336 | GNUNET_free (session->info); |
952 | session->info = NULL; | 337 | session->info = NULL; |
953 | } | 338 | } |
954 | if (NULL != session->ibfs) | ||
955 | { | ||
956 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
957 | { | ||
958 | if (NULL != session->ibfs[i]) | ||
959 | { | ||
960 | ibf_destroy (session->ibfs[i]); | ||
961 | session->ibfs[i] = NULL; | ||
962 | } | ||
963 | } | ||
964 | GNUNET_free (session->ibfs); | ||
965 | session->ibfs = NULL; | ||
966 | } | ||
967 | if (NULL != session->values) | ||
968 | { | ||
969 | GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL); | ||
970 | GNUNET_CONTAINER_multihashmap_destroy (session->values); | ||
971 | session->values = NULL; | ||
972 | } | ||
973 | |||
974 | if (NULL != session->ibf_key_map) | ||
975 | { | ||
976 | GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map); | ||
977 | session->ibf_key_map = NULL; | ||
978 | } | ||
979 | GNUNET_free (session); | 339 | GNUNET_free (session); |
980 | } | 340 | } |
981 | 341 | ||
982 | 342 | ||
983 | static void | ||
984 | send_client_conclude_done (struct ConsensusSession *session) | ||
985 | { | ||
986 | struct PendingMessage *pm; | ||
987 | |||
988 | /* check if client is even there anymore */ | ||
989 | if (NULL == session->scss.client) | ||
990 | return; | ||
991 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader), | ||
992 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | ||
993 | message_queue_add (session->client_mq, pm); | ||
994 | } | ||
995 | |||
996 | |||
997 | /** | ||
998 | * Check if a strata message is for the current round or not | ||
999 | * | ||
1000 | * @param session session we are in | ||
1001 | * @param strata_msg the strata message to check | ||
1002 | * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise | ||
1003 | */ | ||
1004 | static int | ||
1005 | is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) | ||
1006 | { | ||
1007 | switch (strata_msg->round) | ||
1008 | { | ||
1009 | case CONSENSUS_ROUND_COMPLETION: | ||
1010 | case CONSENSUS_ROUND_EXCHANGE: | ||
1011 | /* here, we also have to compare subrounds */ | ||
1012 | if ( (strata_msg->round != session->current_round) || | ||
1013 | (strata_msg->exp_round != session->exp_round) || | ||
1014 | (strata_msg->exp_subround != session->exp_subround) ) | ||
1015 | return GNUNET_YES; | ||
1016 | break; | ||
1017 | default: | ||
1018 | if (session->current_round != strata_msg->round) | ||
1019 | return GNUNET_YES; | ||
1020 | break; | ||
1021 | } | ||
1022 | return GNUNET_NO; | ||
1023 | } | ||
1024 | |||
1025 | |||
1026 | /** | ||
1027 | * Send a strata estimator. | ||
1028 | * | ||
1029 | * @param cpi the peer | ||
1030 | */ | ||
1031 | static void | ||
1032 | send_strata_estimator (struct ConsensusPeerInformation *cpi) | ||
1033 | { | ||
1034 | struct PendingMessage *pm; | ||
1035 | struct StrataMessage *strata_msg; | ||
1036 | |||
1037 | /* FIXME: why is this correct? */ | ||
1038 | cpi->apparent_round = cpi->session->current_round; | ||
1039 | cpi->ibf_state = IBF_STATE_NONE; | ||
1040 | cpi->ibf_bucket_counter = 0; | ||
1041 | |||
1042 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round); | ||
1043 | |||
1044 | pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE), | ||
1045 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | ||
1046 | strata_msg = (struct StrataMessage *) pm->msg; | ||
1047 | strata_msg->round = cpi->session->current_round; | ||
1048 | strata_msg->exp_round = cpi->session->exp_round; | ||
1049 | strata_msg->exp_subround = cpi->session->exp_subround; | ||
1050 | strata_estimator_write (cpi->session->se, &strata_msg[1]); | ||
1051 | message_queue_add (cpi->mss.mq, pm); | ||
1052 | } | ||
1053 | |||
1054 | |||
1055 | /** | ||
1056 | * Send an IBF of the order specified in cpi. | ||
1057 | * | ||
1058 | * @param cpi the peer | ||
1059 | */ | ||
1060 | static void | ||
1061 | send_ibf (struct ConsensusPeerInformation *cpi) | ||
1062 | { | ||
1063 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", | ||
1064 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1065 | |||
1066 | cpi->ibf_bucket_counter = 0; | ||
1067 | while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) | ||
1068 | { | ||
1069 | unsigned int num_buckets; | ||
1070 | struct PendingMessage *pm; | ||
1071 | struct DifferenceDigest *digest; | ||
1072 | |||
1073 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; | ||
1074 | /* limit to maximum */ | ||
1075 | if (num_buckets > BUCKETS_PER_MESSAGE) | ||
1076 | num_buckets = BUCKETS_PER_MESSAGE; | ||
1077 | |||
1078 | pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE), | ||
1079 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | ||
1080 | digest = (struct DifferenceDigest *) pm->msg; | ||
1081 | digest->order = cpi->ibf_order; | ||
1082 | digest->round = cpi->apparent_round; | ||
1083 | ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]); | ||
1084 | cpi->ibf_bucket_counter += num_buckets; | ||
1085 | message_queue_add (cpi->mss.mq, pm); | ||
1086 | } | ||
1087 | cpi->ibf_bucket_counter = 0; | ||
1088 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; | ||
1089 | } | ||
1090 | |||
1091 | |||
1092 | /** | ||
1093 | * Called when a peer sends us its strata estimator. | ||
1094 | * In response, we sent out IBF of appropriate size back. | ||
1095 | * | ||
1096 | * @param cpi session | ||
1097 | * @param strata_msg message | ||
1098 | */ | ||
1099 | static int | ||
1100 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) | ||
1101 | { | ||
1102 | unsigned int diff; | ||
1103 | |||
1104 | if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) && | ||
1105 | (strata_msg->round == CONSENSUS_ROUND_INVENTORY) ) | ||
1106 | { | ||
1107 | /* we still have to handle this request appropriately */ | ||
1108 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", | ||
1109 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1110 | } | ||
1111 | else if (is_premature_strata_message (cpi->session, strata_msg)) | ||
1112 | { | ||
1113 | if (GNUNET_NO == cpi->replaying_strata_message) | ||
1114 | { | ||
1115 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n", | ||
1116 | strata_msg->exp_round, strata_msg->exp_subround); | ||
1117 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header); | ||
1118 | } | ||
1119 | return GNUNET_YES; | ||
1120 | } | ||
1121 | |||
1122 | if (NULL == cpi->se) | ||
1123 | cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); | ||
1124 | |||
1125 | cpi->apparent_round = strata_msg->round; | ||
1126 | |||
1127 | if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) | ||
1128 | { | ||
1129 | LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n"); | ||
1130 | return GNUNET_NO; | ||
1131 | } | ||
1132 | strata_estimator_read (&strata_msg[1], cpi->se); | ||
1133 | GNUNET_assert (NULL != cpi->session->se); | ||
1134 | diff = strata_estimator_difference (cpi->session->se, cpi->se); | ||
1135 | |||
1136 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", | ||
1137 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); | ||
1138 | |||
1139 | switch (cpi->session->current_round) | ||
1140 | { | ||
1141 | case CONSENSUS_ROUND_EXCHANGE: | ||
1142 | case CONSENSUS_ROUND_INVENTORY: | ||
1143 | case CONSENSUS_ROUND_COMPLETION: | ||
1144 | /* send IBF of the right size */ | ||
1145 | cpi->ibf_order = 0; | ||
1146 | while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) ) | ||
1147 | cpi->ibf_order++; | ||
1148 | if (cpi->ibf_order > MAX_IBF_ORDER) | ||
1149 | cpi->ibf_order = MAX_IBF_ORDER; | ||
1150 | cpi->ibf_order += 1; | ||
1151 | /* create ibf if not already pre-computed */ | ||
1152 | prepare_ibf (cpi); | ||
1153 | if (NULL != cpi->ibf) | ||
1154 | ibf_destroy (cpi->ibf); | ||
1155 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1156 | cpi->ibf_bucket_counter = 0; | ||
1157 | send_ibf (cpi); | ||
1158 | break; | ||
1159 | default: | ||
1160 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n", | ||
1161 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1162 | break; | ||
1163 | } | ||
1164 | return GNUNET_YES; | ||
1165 | } | ||
1166 | |||
1167 | |||
1168 | |||
1169 | static int | ||
1170 | send_elements_iterator (void *cls, | ||
1171 | const struct GNUNET_HashCode * key, | ||
1172 | void *value) | ||
1173 | { | ||
1174 | struct ConsensusPeerInformation *cpi = cls; | ||
1175 | struct ElementInfo *ei; | ||
1176 | ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value); | ||
1177 | if (NULL == ei) | ||
1178 | { | ||
1179 | LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n", | ||
1180 | GNUNET_h2s((struct GNUNET_HashCode *) value)); | ||
1181 | return GNUNET_YES; | ||
1182 | } | ||
1183 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n"); | ||
1184 | send_element_or_report (cpi, ei); | ||
1185 | return GNUNET_YES; | ||
1186 | } | ||
1187 | |||
1188 | |||
1189 | /** | ||
1190 | * Decode the current diff ibf, and send elements/requests/reports/ | ||
1191 | * | ||
1192 | * @param cpi partner peer | ||
1193 | */ | ||
1194 | static void | ||
1195 | decode (struct ConsensusPeerInformation *cpi) | ||
1196 | { | ||
1197 | struct IBF_Key key; | ||
1198 | int side; | ||
1199 | |||
1200 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1201 | |||
1202 | while (1) | ||
1203 | { | ||
1204 | int res; | ||
1205 | |||
1206 | res = ibf_decode (cpi->ibf, &side, &key); | ||
1207 | if (GNUNET_SYSERR == res) | ||
1208 | { | ||
1209 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); | ||
1210 | /* decoding failed, we tell the other peer by sending our ibf with a larger order */ | ||
1211 | cpi->ibf_order++; | ||
1212 | prepare_ibf (cpi); | ||
1213 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1214 | cpi->ibf_bucket_counter = 0; | ||
1215 | send_ibf (cpi); | ||
1216 | return; | ||
1217 | } | ||
1218 | if (GNUNET_NO == res) | ||
1219 | { | ||
1220 | struct PendingMessage *pm; | ||
1221 | struct ConsensusRoundMessage *rmsg; | ||
1222 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); | ||
1223 | |||
1224 | pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); | ||
1225 | rmsg = (struct ConsensusRoundMessage *) pm->msg; | ||
1226 | rmsg->round = cpi->apparent_round; | ||
1227 | message_queue_add (cpi->mss.mq, pm); | ||
1228 | return; | ||
1229 | } | ||
1230 | if (-1 == side) | ||
1231 | { | ||
1232 | struct GNUNET_HashCode hashcode; | ||
1233 | /* we have the element(s), send it to the other peer */ | ||
1234 | ibf_hashcode_from_key (key, &hashcode); | ||
1235 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); | ||
1236 | } | ||
1237 | else | ||
1238 | { | ||
1239 | struct PendingMessage *pm; | ||
1240 | uint16_t type; | ||
1241 | |||
1242 | switch (cpi->apparent_round) | ||
1243 | { | ||
1244 | case CONSENSUS_ROUND_COMPLETION: | ||
1245 | /* FIXME: check if we really want to request the element */ | ||
1246 | case CONSENSUS_ROUND_EXCHANGE: | ||
1247 | case CONSENSUS_ROUND_INVENTORY: | ||
1248 | type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST; | ||
1249 | break; | ||
1250 | default: | ||
1251 | GNUNET_assert (0); | ||
1252 | } | ||
1253 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key), | ||
1254 | type); | ||
1255 | *(struct IBF_Key *) &pm->msg[1] = key; | ||
1256 | message_queue_add (cpi->mss.mq, pm); | ||
1257 | } | ||
1258 | } | ||
1259 | } | ||
1260 | |||
1261 | |||
1262 | static int | ||
1263 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) | ||
1264 | { | ||
1265 | int num_buckets; | ||
1266 | |||
1267 | /* FIXME: find out if we're still expecting the same ibf! */ | ||
1268 | |||
1269 | cpi->apparent_round = cpi->session->current_round; | ||
1270 | // FIXME: check header.size >= sizeof (DD) | ||
1271 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; | ||
1272 | switch (cpi->ibf_state) | ||
1273 | { | ||
1274 | case IBF_STATE_NONE: | ||
1275 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1276 | cpi->ibf_state = IBF_STATE_RECEIVING; | ||
1277 | cpi->ibf_order = digest->order; | ||
1278 | cpi->ibf_bucket_counter = 0; | ||
1279 | if (NULL != cpi->ibf) | ||
1280 | { | ||
1281 | ibf_destroy (cpi->ibf); | ||
1282 | cpi->ibf = NULL; | ||
1283 | } | ||
1284 | break; | ||
1285 | case IBF_STATE_ANTICIPATE_DIFF: | ||
1286 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n", | ||
1287 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1288 | cpi->ibf_state = IBF_STATE_RECEIVING; | ||
1289 | cpi->ibf_order = digest->order; | ||
1290 | cpi->ibf_bucket_counter = 0; | ||
1291 | if (NULL != cpi->ibf) | ||
1292 | { | ||
1293 | ibf_destroy (cpi->ibf); | ||
1294 | cpi->ibf = NULL; | ||
1295 | } | ||
1296 | break; | ||
1297 | case IBF_STATE_RECEIVING: | ||
1298 | break; | ||
1299 | default: | ||
1300 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1301 | return GNUNET_YES; | ||
1302 | } | ||
1303 | |||
1304 | if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) | ||
1305 | { | ||
1306 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1307 | return GNUNET_YES; | ||
1308 | } | ||
1309 | |||
1310 | if (NULL == cpi->ibf) | ||
1311 | cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); | ||
1312 | |||
1313 | ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf); | ||
1314 | cpi->ibf_bucket_counter += num_buckets; | ||
1315 | |||
1316 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | ||
1317 | { | ||
1318 | cpi->ibf_state = IBF_STATE_DECODING; | ||
1319 | cpi->ibf_bucket_counter = 0; | ||
1320 | prepare_ibf (cpi); | ||
1321 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); | ||
1322 | decode (cpi); | ||
1323 | } | ||
1324 | return GNUNET_YES; | ||
1325 | } | ||
1326 | |||
1327 | |||
1328 | /** | ||
1329 | * Insert an element into the consensus set of the specified session. | ||
1330 | * The element will not be copied, and freed when destroying the session. | ||
1331 | * | ||
1332 | * @param session session for new element | ||
1333 | * @param element element to insert | ||
1334 | */ | ||
1335 | static void | ||
1336 | insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) | ||
1337 | { | ||
1338 | struct GNUNET_HashCode hash; | ||
1339 | struct ElementInfo *e; | ||
1340 | struct IBF_Key ibf_key; | ||
1341 | int i; | ||
1342 | |||
1343 | e = GNUNET_new (struct ElementInfo); | ||
1344 | e->element = element; | ||
1345 | e->element_hash = GNUNET_new (struct GNUNET_HashCode); | ||
1346 | GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash); | ||
1347 | ibf_key = ibf_key_from_hashcode (e->element_hash); | ||
1348 | ibf_hashcode_from_key (ibf_key, &hash); | ||
1349 | strata_estimator_insert (session->se, &hash); | ||
1350 | GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e, | ||
1351 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
1352 | GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash, | ||
1353 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1354 | |||
1355 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
1356 | { | ||
1357 | if (NULL == session->ibfs[i]) | ||
1358 | continue; | ||
1359 | ibf_insert (session->ibfs[i], ibf_key); | ||
1360 | } | ||
1361 | } | ||
1362 | |||
1363 | |||
1364 | /** | ||
1365 | * Handle an element that another peer sent us | ||
1366 | */ | ||
1367 | static int | ||
1368 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) | ||
1369 | { | ||
1370 | struct GNUNET_CONSENSUS_Element *element; | ||
1371 | size_t size; | ||
1372 | |||
1373 | switch (cpi->session->current_round) | ||
1374 | { | ||
1375 | case CONSENSUS_ROUND_COMPLETION: | ||
1376 | /* FIXME: check if we really expect the element */ | ||
1377 | case CONSENSUS_ROUND_EXCHANGE: | ||
1378 | break; | ||
1379 | default: | ||
1380 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n"); | ||
1381 | return GNUNET_YES; | ||
1382 | } | ||
1383 | |||
1384 | size = ntohs (element_msg->size) - sizeof *element_msg; | ||
1385 | |||
1386 | element = GNUNET_malloc (size + sizeof *element); | ||
1387 | element->size = size; | ||
1388 | memcpy (&element[1], &element_msg[1], size); | ||
1389 | element->data = &element[1]; | ||
1390 | |||
1391 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n"); | ||
1392 | |||
1393 | insert_element (cpi->session, element); | ||
1394 | |||
1395 | return GNUNET_YES; | ||
1396 | } | ||
1397 | |||
1398 | |||
1399 | /** | ||
1400 | * Handle a request for elements. | ||
1401 | * | ||
1402 | * @param cpi peer that is requesting the element | ||
1403 | * @param msg the element request message | ||
1404 | */ | ||
1405 | static int | ||
1406 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) | ||
1407 | { | ||
1408 | struct GNUNET_HashCode hashcode; | ||
1409 | struct IBF_Key *ibf_key; | ||
1410 | unsigned int num; | ||
1411 | |||
1412 | /* element requests are allowed in every round */ | ||
1413 | |||
1414 | num = ntohs (msg->header.size) / sizeof (struct IBF_Key); | ||
1415 | |||
1416 | ibf_key = (struct IBF_Key *) &msg[1]; | ||
1417 | while (num--) | ||
1418 | { | ||
1419 | ibf_hashcode_from_key (*ibf_key, &hashcode); | ||
1420 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); | ||
1421 | ibf_key++; | ||
1422 | } | ||
1423 | return GNUNET_YES; | ||
1424 | } | ||
1425 | |||
1426 | static int | ||
1427 | is_peer_connected (struct ConsensusPeerInformation *cpi) | ||
1428 | { | ||
1429 | if (NULL == cpi->mss.socket) | ||
1430 | return GNUNET_NO; | ||
1431 | return GNUNET_YES; | ||
1432 | } | ||
1433 | |||
1434 | |||
1435 | static void | ||
1436 | ensure_peer_connected (struct ConsensusPeerInformation *cpi) | ||
1437 | { | ||
1438 | if (NULL != cpi->mss.socket) | ||
1439 | return; | ||
1440 | cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
1441 | open_cb, cpi, GNUNET_STREAM_OPTION_END); | ||
1442 | } | ||
1443 | |||
1444 | |||
1445 | /** | ||
1446 | * If necessary, send a message to the peer, depending on the current | ||
1447 | * round. | ||
1448 | */ | ||
1449 | static void | ||
1450 | embrace_peer (struct ConsensusPeerInformation *cpi) | ||
1451 | { | ||
1452 | if (GNUNET_NO == is_peer_connected (cpi)) | ||
1453 | { | ||
1454 | ensure_peer_connected (cpi); | ||
1455 | return; | ||
1456 | } | ||
1457 | if (GNUNET_NO == cpi->hello) | ||
1458 | return; | ||
1459 | /* FIXME: correctness of switch */ | ||
1460 | switch (cpi->session->current_round) | ||
1461 | { | ||
1462 | case CONSENSUS_ROUND_EXCHANGE: | ||
1463 | case CONSENSUS_ROUND_INVENTORY: | ||
1464 | if (cpi->session->partner_outgoing != cpi) | ||
1465 | break; | ||
1466 | /* fallthrough */ | ||
1467 | case CONSENSUS_ROUND_COMPLETION: | ||
1468 | send_strata_estimator (cpi); | ||
1469 | default: | ||
1470 | break; | ||
1471 | } | ||
1472 | } | ||
1473 | |||
1474 | |||
1475 | /** | ||
1476 | * Called when stream has finishes writing the hello message | ||
1477 | */ | ||
1478 | static void | ||
1479 | hello_cont (void *cls) | ||
1480 | { | ||
1481 | struct ConsensusPeerInformation *cpi = cls; | ||
1482 | |||
1483 | cpi->hello = GNUNET_YES; | ||
1484 | embrace_peer (cpi); | ||
1485 | } | ||
1486 | |||
1487 | |||
1488 | /** | ||
1489 | * Called when we established a stream connection to another peer | ||
1490 | * | ||
1491 | * @param cls cpi of the peer we just connected to | ||
1492 | * @param socket socket to use to communicate with the other side (read/write) | ||
1493 | */ | ||
1494 | static void | ||
1495 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | ||
1496 | { | ||
1497 | struct ConsensusPeerInformation *cpi = cls; | ||
1498 | struct PendingMessage *pm; | ||
1499 | struct ConsensusHello *hello; | ||
1500 | |||
1501 | GNUNET_assert (NULL == cpi->mss.mst); | ||
1502 | GNUNET_assert (NULL == cpi->mss.mq); | ||
1503 | |||
1504 | cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); | ||
1505 | cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); | ||
1506 | cpi->mss.mst_cls = cpi; | ||
1507 | |||
1508 | pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); | ||
1509 | hello = (struct ConsensusHello *) pm->msg; | ||
1510 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); | ||
1511 | pm->sent_cb = hello_cont; | ||
1512 | pm->sent_cb_cls = cpi; | ||
1513 | message_queue_add (cpi->mss.mq, pm); | ||
1514 | cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1515 | &stream_data_processor, &cpi->mss); | ||
1516 | } | ||
1517 | |||
1518 | |||
1519 | static void | ||
1520 | replay_premature_message (struct ConsensusPeerInformation *cpi) | ||
1521 | { | ||
1522 | if (NULL != cpi->premature_strata_message) | ||
1523 | { | ||
1524 | struct StrataMessage *sm; | ||
1525 | |||
1526 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); | ||
1527 | sm = cpi->premature_strata_message; | ||
1528 | cpi->premature_strata_message = NULL; | ||
1529 | |||
1530 | cpi->replaying_strata_message = GNUNET_YES; | ||
1531 | handle_p2p_strata (cpi, sm); | ||
1532 | cpi->replaying_strata_message = GNUNET_NO; | ||
1533 | |||
1534 | GNUNET_free (sm); | ||
1535 | } | ||
1536 | } | ||
1537 | |||
1538 | |||
1539 | /** | 343 | /** |
1540 | * Start the inventory round, contact all peers we are supposed to contact. | 344 | * Start the inventory round, contact all peers we are supposed to contact. |
1541 | * | 345 | * |
@@ -1548,11 +352,7 @@ start_inventory (struct ConsensusSession *session) | |||
1548 | int last; | 352 | int last; |
1549 | 353 | ||
1550 | for (i = 0; i < session->num_peers; i++) | 354 | for (i = 0; i < session->num_peers; i++) |
1551 | { | ||
1552 | session->info[i].ibf_bucket_counter = 0; | ||
1553 | session->info[i].ibf_state = IBF_STATE_NONE; | ||
1554 | session->info[i].is_outgoing = GNUNET_NO; | 355 | session->info[i].is_outgoing = GNUNET_NO; |
1555 | } | ||
1556 | 356 | ||
1557 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | 357 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
1558 | i = (session->local_peer_idx + 1) % session->num_peers; | 358 | i = (session->local_peer_idx + 1) % session->num_peers; |
@@ -1560,7 +360,7 @@ start_inventory (struct ConsensusSession *session) | |||
1560 | { | 360 | { |
1561 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); | 361 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); |
1562 | session->info[i].is_outgoing = GNUNET_YES; | 362 | session->info[i].is_outgoing = GNUNET_YES; |
1563 | embrace_peer (&session->info[i]); | 363 | // embrace_peer (&session->info[i]); |
1564 | i = (i + 1) % session->num_peers; | 364 | i = (i + 1) % session->num_peers; |
1565 | } | 365 | } |
1566 | // tie-breaker for even number of peers | 366 | // tie-breaker for even number of peers |
@@ -1568,49 +368,12 @@ start_inventory (struct ConsensusSession *session) | |||
1568 | { | 368 | { |
1569 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); | 369 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); |
1570 | session->info[last].is_outgoing = GNUNET_YES; | 370 | session->info[last].is_outgoing = GNUNET_YES; |
1571 | embrace_peer (&session->info[last]); | 371 | // embrace_peer (&session->info[last]); |
1572 | } | ||
1573 | |||
1574 | for (i = 0; i < session->num_peers; i++) | ||
1575 | { | ||
1576 | if (GNUNET_NO == session->info[i].is_outgoing) | ||
1577 | replay_premature_message (&session->info[i]); | ||
1578 | } | 372 | } |
1579 | } | 373 | } |
1580 | 374 | ||
1581 | 375 | ||
1582 | /** | 376 | /** |
1583 | * Iterator over hash map entries. | ||
1584 | * | ||
1585 | * @param cls closure | ||
1586 | * @param key current key code | ||
1587 | * @param value value in the hash map | ||
1588 | * @return GNUNET_YES if we should continue to | ||
1589 | * iterate, | ||
1590 | * GNUNET_NO if not. | ||
1591 | */ | ||
1592 | static int | ||
1593 | send_client_elements_iter (void *cls, | ||
1594 | const struct GNUNET_HashCode * key, | ||
1595 | void *value) | ||
1596 | { | ||
1597 | struct ConsensusSession *session = cls; | ||
1598 | struct ElementInfo *ei = value; | ||
1599 | struct PendingMessage *pm; | ||
1600 | |||
1601 | /* is the client still there? */ | ||
1602 | if (NULL == session->scss.client) | ||
1603 | return GNUNET_NO; | ||
1604 | |||
1605 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size, | ||
1606 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); | ||
1607 | message_queue_add (session->client_mq, pm); | ||
1608 | return GNUNET_YES; | ||
1609 | } | ||
1610 | |||
1611 | |||
1612 | |||
1613 | /** | ||
1614 | * Start the next round. | 377 | * Start the next round. |
1615 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | 378 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). |
1616 | * | 379 | * |
@@ -1630,7 +393,7 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1630 | session = cls; | 393 | session = cls; |
1631 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); | 394 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); |
1632 | 395 | ||
1633 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | 396 | if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK) |
1634 | { | 397 | { |
1635 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 398 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
1636 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 399 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
@@ -1648,8 +411,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1648 | if (session->num_peers <= 2) | 411 | if (session->num_peers <= 2) |
1649 | { | 412 | { |
1650 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); | 413 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); |
1651 | GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); | 414 | //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); |
1652 | send_client_conclude_done (session); | 415 | //send_client_conclude_done (session); |
1653 | session->current_round = CONSENSUS_ROUND_FINISH; | 416 | session->current_round = CONSENSUS_ROUND_FINISH; |
1654 | return; | 417 | return; |
1655 | } | 418 | } |
@@ -1663,7 +426,7 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1663 | break; | 426 | break; |
1664 | case CONSENSUS_ROUND_COMPLETION: | 427 | case CONSENSUS_ROUND_COMPLETION: |
1665 | session->current_round = CONSENSUS_ROUND_FINISH; | 428 | session->current_round = CONSENSUS_ROUND_FINISH; |
1666 | send_client_conclude_done (session); | 429 | //send_client_conclude_done (session); |
1667 | break; | 430 | break; |
1668 | default: | 431 | default: |
1669 | GNUNET_assert (0); | 432 | GNUNET_assert (0); |
@@ -1671,159 +434,9 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1671 | } | 434 | } |
1672 | 435 | ||
1673 | 436 | ||
1674 | static void | ||
1675 | fin_sent_cb (void *cls) | ||
1676 | { | ||
1677 | struct ConsensusPeerInformation *cpi; | ||
1678 | cpi = cls; | ||
1679 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); | ||
1680 | switch (cpi->session->current_round) | ||
1681 | { | ||
1682 | case CONSENSUS_ROUND_EXCHANGE: | ||
1683 | case CONSENSUS_ROUND_COMPLETION: | ||
1684 | if (cpi->session->current_round != cpi->apparent_round) | ||
1685 | { | ||
1686 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); | ||
1687 | break; | ||
1688 | } | ||
1689 | cpi->exp_subround_finished = GNUNET_YES; | ||
1690 | /* the subround is only really over if *both* partners are done */ | ||
1691 | if (GNUNET_YES == exp_subround_finished (cpi->session)) | ||
1692 | subround_over (cpi->session, NULL); | ||
1693 | else | ||
1694 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); | ||
1695 | break; | ||
1696 | case CONSENSUS_ROUND_INVENTORY: | ||
1697 | cpi->inventory_synced = GNUNET_YES; | ||
1698 | if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) | ||
1699 | round_over (cpi->session, NULL); | ||
1700 | /* FIXME: maybe go to next round */ | ||
1701 | break; | ||
1702 | default: | ||
1703 | GNUNET_break (0); | ||
1704 | } | ||
1705 | } | ||
1706 | |||
1707 | |||
1708 | /** | 437 | /** |
1709 | * The other peer wants us to inform that he sent us all the elements we requested. | 438 | * Adapt the shuffle of the session for the current round. |
1710 | */ | 439 | */ |
1711 | static int | ||
1712 | handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
1713 | { | ||
1714 | struct ConsensusRoundMessage *round_msg; | ||
1715 | round_msg = (struct ConsensusRoundMessage *) msg; | ||
1716 | /* FIXME: only call subround_over if round is the current one! */ | ||
1717 | switch (cpi->session->current_round) | ||
1718 | { | ||
1719 | case CONSENSUS_ROUND_EXCHANGE: | ||
1720 | case CONSENSUS_ROUND_COMPLETION: | ||
1721 | if (cpi->session->current_round != round_msg->round) | ||
1722 | { | ||
1723 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1724 | cpi->ibf_state = IBF_STATE_NONE; | ||
1725 | cpi->ibf_bucket_counter = 0; | ||
1726 | break; | ||
1727 | } | ||
1728 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1729 | cpi->exp_subround_finished = GNUNET_YES; | ||
1730 | if (GNUNET_YES == exp_subround_finished (cpi->session)) | ||
1731 | subround_over (cpi->session, NULL); | ||
1732 | else | ||
1733 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); | ||
1734 | break; | ||
1735 | case CONSENSUS_ROUND_INVENTORY: | ||
1736 | cpi->inventory_synced = GNUNET_YES; | ||
1737 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1738 | if (inventory_round_finished (cpi->session)) | ||
1739 | round_over (cpi->session, NULL); | ||
1740 | break; | ||
1741 | default: | ||
1742 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); | ||
1743 | break; | ||
1744 | } | ||
1745 | return GNUNET_YES; | ||
1746 | } | ||
1747 | |||
1748 | |||
1749 | /** | ||
1750 | * Gets called when the other peer wants us to inform that | ||
1751 | * it has decoded our ibf and sent us all elements / requests | ||
1752 | */ | ||
1753 | static int | ||
1754 | handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
1755 | { | ||
1756 | struct PendingMessage *pm; | ||
1757 | struct ConsensusRoundMessage *fin_msg; | ||
1758 | |||
1759 | /* FIXME: why handle current round?? */ | ||
1760 | switch (cpi->session->current_round) | ||
1761 | { | ||
1762 | case CONSENSUS_ROUND_INVENTORY: | ||
1763 | cpi->inventory_synced = GNUNET_YES; | ||
1764 | case CONSENSUS_ROUND_COMPLETION: | ||
1765 | case CONSENSUS_ROUND_EXCHANGE: | ||
1766 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n"); | ||
1767 | pm = new_pending_message (sizeof *fin_msg, | ||
1768 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); | ||
1769 | fin_msg = (struct ConsensusRoundMessage *) pm->msg; | ||
1770 | fin_msg->round = cpi->apparent_round; | ||
1771 | /* the subround is over once we kicked off sending the fin msg */ | ||
1772 | /* FIXME: assert we are talking to the right peer! */ | ||
1773 | /* FIXME: mark peer as synced */ | ||
1774 | pm->sent_cb = fin_sent_cb; | ||
1775 | pm->sent_cb_cls = cpi; | ||
1776 | message_queue_add (cpi->mss.mq, pm); | ||
1777 | break; | ||
1778 | default: | ||
1779 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); | ||
1780 | break; | ||
1781 | } | ||
1782 | return GNUNET_YES; | ||
1783 | } | ||
1784 | |||
1785 | |||
1786 | /** | ||
1787 | * Functions with this signature are called whenever a | ||
1788 | * complete message is received by the tokenizer. | ||
1789 | * | ||
1790 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
1791 | * | ||
1792 | * @param cls closure | ||
1793 | * @param client identification of the client | ||
1794 | * @param message the actual message | ||
1795 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
1796 | */ | ||
1797 | static int | ||
1798 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | ||
1799 | { | ||
1800 | struct ConsensusPeerInformation *cpi = cls; | ||
1801 | GNUNET_assert (NULL == client); | ||
1802 | GNUNET_assert (NULL != cls); | ||
1803 | switch (ntohs (message->type)) | ||
1804 | { | ||
1805 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: | ||
1806 | return handle_p2p_strata (cpi, (struct StrataMessage *) message); | ||
1807 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: | ||
1808 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); | ||
1809 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: | ||
1810 | return handle_p2p_element (cpi, message); | ||
1811 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT: | ||
1812 | return handle_p2p_element_report (cpi, message); | ||
1813 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: | ||
1814 | return handle_p2p_element_request (cpi, (struct ElementRequest *) message); | ||
1815 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED: | ||
1816 | return handle_p2p_synced (cpi, message); | ||
1817 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN: | ||
1818 | return handle_p2p_fin (cpi, message); | ||
1819 | default: | ||
1820 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n", | ||
1821 | ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey)); | ||
1822 | } | ||
1823 | return GNUNET_OK; | ||
1824 | } | ||
1825 | |||
1826 | |||
1827 | static void | 440 | static void |
1828 | shuffle (struct ConsensusSession *session) | 441 | shuffle (struct ConsensusSession *session) |
1829 | { | 442 | { |
@@ -1860,6 +473,7 @@ find_partners (struct ConsensusSession *session) | |||
1860 | { | 473 | { |
1861 | int mark[session->num_peers]; | 474 | int mark[session->num_peers]; |
1862 | int i; | 475 | int i; |
476 | |||
1863 | memset (mark, 0, session->num_peers * sizeof (int)); | 477 | memset (mark, 0, session->num_peers * sizeof (int)); |
1864 | session->partner_incoming = session->partner_outgoing = NULL; | 478 | session->partner_incoming = session->partner_outgoing = NULL; |
1865 | for (i = 0; i < session->num_peers; i++) | 479 | for (i = 0; i < session->num_peers; i++) |
@@ -1887,6 +501,22 @@ find_partners (struct ConsensusSession *session) | |||
1887 | 501 | ||
1888 | 502 | ||
1889 | /** | 503 | /** |
504 | * Callback for set operation results. Called for each element | ||
505 | * in the result set. | ||
506 | * | ||
507 | * @param cls closure | ||
508 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK | ||
509 | * @param status see enum GNUNET_SET_Status | ||
510 | */ | ||
511 | static void set_result_cb (void *cls, | ||
512 | const struct GNUNET_SET_Element *element, | ||
513 | enum GNUNET_SET_Status status) | ||
514 | { | ||
515 | /* FIXME */ | ||
516 | } | ||
517 | |||
518 | |||
519 | /** | ||
1890 | * Do the next subround in the exp-scheme. | 520 | * Do the next subround in the exp-scheme. |
1891 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | 521 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). |
1892 | * | 522 | * |
@@ -1905,9 +535,11 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1905 | return; | 535 | return; |
1906 | session = cls; | 536 | session = cls; |
1907 | /* cancel timeout */ | 537 | /* cancel timeout */ |
1908 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | 538 | if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK) |
539 | { | ||
1909 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 540 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
1910 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 541 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
542 | } | ||
1911 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ | 543 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ |
1912 | if ( (session->exp_round == NUM_EXP_ROUNDS) || | 544 | if ( (session->exp_round == NUM_EXP_ROUNDS) || |
1913 | ((session->num_peers == 2) && (session->exp_round == 1))) | 545 | ((session->num_peers == 2) && (session->exp_round == 1))) |
@@ -1938,8 +570,25 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1938 | session->exp_subround++; | 570 | session->exp_subround++; |
1939 | } | 571 | } |
1940 | 572 | ||
573 | /* determine the incoming and outgoing partner */ | ||
1941 | find_partners (session); | 574 | find_partners (session); |
1942 | 575 | ||
576 | if (NULL != session->partner_outgoing) | ||
577 | { | ||
578 | if (NULL != session->partner_outgoing->set_op) | ||
579 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); | ||
580 | session->partner_outgoing->set_op = | ||
581 | GNUNET_SET_evaluate (session->element_set, | ||
582 | &session->partner_outgoing->peer_id, | ||
583 | &session->global_id, | ||
584 | NULL, /* FIXME */ | ||
585 | 0, /* FIXME */ | ||
586 | GNUNET_SET_RESULT_ADDED, | ||
587 | set_result_cb, session); | ||
588 | |||
589 | |||
590 | } | ||
591 | |||
1943 | #ifdef GNUNET_EXTRA_LOGGING | 592 | #ifdef GNUNET_EXTRA_LOGGING |
1944 | { | 593 | { |
1945 | int in; | 594 | int in; |
@@ -1957,29 +606,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1957 | } | 606 | } |
1958 | #endif /* GNUNET_EXTRA_LOGGING */ | 607 | #endif /* GNUNET_EXTRA_LOGGING */ |
1959 | 608 | ||
1960 | if (NULL != session->partner_incoming) | ||
1961 | { | ||
1962 | session->partner_incoming->ibf_state = IBF_STATE_NONE; | ||
1963 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
1964 | session->partner_incoming->ibf_bucket_counter = 0; | ||
1965 | |||
1966 | /* maybe there's an early strata estimator? */ | ||
1967 | replay_premature_message (session->partner_incoming); | ||
1968 | } | ||
1969 | |||
1970 | if (NULL != session->partner_outgoing) | ||
1971 | { | ||
1972 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; | ||
1973 | session->partner_outgoing->ibf_bucket_counter = 0; | ||
1974 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
1975 | /* make sure peer is connected and send the SE */ | ||
1976 | embrace_peer (session->partner_outgoing); | ||
1977 | } | ||
1978 | |||
1979 | /* | ||
1980 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), | ||
1981 | subround_over, session); | ||
1982 | */ | ||
1983 | } | 609 | } |
1984 | 610 | ||
1985 | 611 | ||
@@ -2002,146 +628,6 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess | |||
2002 | 628 | ||
2003 | 629 | ||
2004 | /** | 630 | /** |
2005 | * Handle a HELLO-message, send when another peer wants to join a session where | ||
2006 | * our peer is a member. The session may or may not be inhabited yet. | ||
2007 | */ | ||
2008 | static int | ||
2009 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) | ||
2010 | { | ||
2011 | struct ConsensusSession *session; | ||
2012 | |||
2013 | if (NULL != inc->requested_gid) | ||
2014 | { | ||
2015 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n"); | ||
2016 | return GNUNET_YES; | ||
2017 | } | ||
2018 | if (NULL != inc->cpi) | ||
2019 | { | ||
2020 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n"); | ||
2021 | return GNUNET_YES; | ||
2022 | } | ||
2023 | |||
2024 | for (session = sessions_head; NULL != session; session = session->next) | ||
2025 | { | ||
2026 | int idx; | ||
2027 | struct ConsensusPeerInformation *cpi; | ||
2028 | if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) | ||
2029 | continue; | ||
2030 | idx = get_peer_idx (&inc->peer_id, session); | ||
2031 | GNUNET_assert (-1 != idx); | ||
2032 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); | ||
2033 | cpi = &session->info[idx]; | ||
2034 | inc->cpi = cpi; | ||
2035 | cpi->mss = inc->mss; | ||
2036 | cpi = &session->info[idx]; | ||
2037 | cpi->hello = GNUNET_YES; | ||
2038 | cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); | ||
2039 | embrace_peer (cpi); | ||
2040 | return GNUNET_YES; | ||
2041 | } | ||
2042 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); | ||
2043 | inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode)); | ||
2044 | return GNUNET_YES; | ||
2045 | } | ||
2046 | |||
2047 | |||
2048 | |||
2049 | /** | ||
2050 | * Handle tokenized messages from stream sockets. | ||
2051 | * Delegate them if the socket belongs to a session, | ||
2052 | * handle hello messages otherwise. | ||
2053 | * | ||
2054 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
2055 | * | ||
2056 | * @param cls closure, unused | ||
2057 | * @param client incoming socket this message comes from | ||
2058 | * @param message the actual message | ||
2059 | * | ||
2060 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
2061 | */ | ||
2062 | static int | ||
2063 | mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | ||
2064 | { | ||
2065 | struct IncomingSocket *inc; | ||
2066 | GNUNET_assert (NULL == client); | ||
2067 | GNUNET_assert (NULL != cls); | ||
2068 | inc = (struct IncomingSocket *) cls; | ||
2069 | switch (ntohs( message->type)) | ||
2070 | { | ||
2071 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: | ||
2072 | return handle_p2p_hello (inc, (struct ConsensusHello *) message); | ||
2073 | default: | ||
2074 | if (NULL != inc->cpi) | ||
2075 | return mst_session_callback (inc->cpi, client, message); | ||
2076 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n", | ||
2077 | ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey)); | ||
2078 | } | ||
2079 | return GNUNET_OK; | ||
2080 | } | ||
2081 | |||
2082 | |||
2083 | /** | ||
2084 | * Functions of this type are called upon new stream connection from other peers | ||
2085 | * or upon binding error which happen when the app_port given in | ||
2086 | * GNUNET_STREAM_listen() is already taken. | ||
2087 | * | ||
2088 | * @param cls the closure from GNUNET_STREAM_listen | ||
2089 | * @param socket the socket representing the stream; NULL on binding error | ||
2090 | * @param initiator the identity of the peer who wants to establish a stream | ||
2091 | * with us; NULL on binding error | ||
2092 | * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the | ||
2093 | * stream (the socket will be invalid after the call) | ||
2094 | */ | ||
2095 | static int | ||
2096 | listen_cb (void *cls, | ||
2097 | struct GNUNET_STREAM_Socket *socket, | ||
2098 | const struct GNUNET_PeerIdentity *initiator) | ||
2099 | { | ||
2100 | struct IncomingSocket *incoming; | ||
2101 | |||
2102 | if (NULL == socket) | ||
2103 | { | ||
2104 | GNUNET_break (0); | ||
2105 | return GNUNET_SYSERR; | ||
2106 | } | ||
2107 | incoming = GNUNET_malloc (sizeof *incoming); | ||
2108 | incoming->peer_id = *initiator; | ||
2109 | incoming->mss.socket = socket; | ||
2110 | incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
2111 | &stream_data_processor, &incoming->mss); | ||
2112 | incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); | ||
2113 | incoming->mss.mst_cls = incoming; | ||
2114 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); | ||
2115 | return GNUNET_OK; | ||
2116 | } | ||
2117 | |||
2118 | |||
2119 | /** | ||
2120 | * Disconnect a client, and destroy all sessions associated with it. | ||
2121 | * | ||
2122 | * @param client the client to disconnect | ||
2123 | */ | ||
2124 | static void | ||
2125 | disconnect_client (struct GNUNET_SERVER_Client *client) | ||
2126 | { | ||
2127 | struct ConsensusSession *session; | ||
2128 | GNUNET_SERVER_client_disconnect (client); | ||
2129 | |||
2130 | /* if the client owns a session, remove it */ | ||
2131 | session = sessions_head; | ||
2132 | while (NULL != session) | ||
2133 | { | ||
2134 | if (client == session->scss.client) | ||
2135 | { | ||
2136 | destroy_session (session); | ||
2137 | break; | ||
2138 | } | ||
2139 | session = session->next; | ||
2140 | } | ||
2141 | } | ||
2142 | |||
2143 | |||
2144 | /** | ||
2145 | * Compute a global, (hopefully) unique consensus session id, | 631 | * Compute a global, (hopefully) unique consensus session id, |
2146 | * from the local id of the consensus session, and the identities of all participants. | 632 | * from the local id of the consensus session, and the identities of all participants. |
2147 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of | 633 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of |
@@ -2188,7 +674,8 @@ hash_cmp (const void *h1, const void *h2) | |||
2188 | * add the local peer if not in the join message. | 674 | * add the local peer if not in the join message. |
2189 | */ | 675 | */ |
2190 | static void | 676 | static void |
2191 | initialize_session_peer_list (struct ConsensusSession *session) | 677 | initialize_session_peer_list (struct ConsensusSession *session, |
678 | struct GNUNET_CONSENSUS_JoinMessage *join_msg) | ||
2192 | { | 679 | { |
2193 | unsigned int local_peer_in_list; | 680 | unsigned int local_peer_in_list; |
2194 | uint32_t listed_peers; | 681 | uint32_t listed_peers; |
@@ -2196,19 +683,19 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
2196 | struct GNUNET_PeerIdentity *peers; | 683 | struct GNUNET_PeerIdentity *peers; |
2197 | unsigned int i; | 684 | unsigned int i; |
2198 | 685 | ||
2199 | GNUNET_assert (NULL != session->join_msg); | 686 | GNUNET_assert (NULL != join_msg); |
2200 | 687 | ||
2201 | /* peers in the join message, may or may not include the local peer */ | 688 | /* peers in the join message, may or may not include the local peer */ |
2202 | listed_peers = ntohl (session->join_msg->num_peers); | 689 | listed_peers = ntohl (join_msg->num_peers); |
2203 | 690 | ||
2204 | session->num_peers = listed_peers; | 691 | session->num_peers = listed_peers; |
2205 | 692 | ||
2206 | msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1]; | 693 | msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1]; |
2207 | 694 | ||
2208 | local_peer_in_list = GNUNET_NO; | 695 | local_peer_in_list = GNUNET_NO; |
2209 | for (i = 0; i < listed_peers; i++) | 696 | for (i = 0; i < listed_peers; i++) |
2210 | { | 697 | { |
2211 | if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) | 698 | if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity))) |
2212 | { | 699 | { |
2213 | local_peer_in_list = GNUNET_YES; | 700 | local_peer_in_list = GNUNET_YES; |
2214 | break; | 701 | break; |
@@ -2221,7 +708,7 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
2221 | peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 708 | peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); |
2222 | 709 | ||
2223 | if (GNUNET_NO == local_peer_in_list) | 710 | if (GNUNET_NO == local_peer_in_list) |
2224 | peers[session->num_peers - 1] = *my_peer; | 711 | peers[session->num_peers - 1] = my_peer; |
2225 | 712 | ||
2226 | memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); | 713 | memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); |
2227 | qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); | 714 | qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); |
@@ -2236,38 +723,34 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
2236 | session->info[i].peer_id = peers[i]; | 723 | session->info[i].peer_id = peers[i]; |
2237 | } | 724 | } |
2238 | 725 | ||
2239 | free (peers); | 726 | GNUNET_free (peers); |
2240 | } | 727 | } |
2241 | 728 | ||
2242 | 729 | ||
730 | |||
731 | |||
732 | |||
2243 | /** | 733 | /** |
2244 | * Add incoming peer connections to the session, | 734 | * Called when another peer wants to do a set operation with the |
2245 | * for peers who have connected to us before the local session has been established | 735 | * local peer. |
2246 | * | 736 | * |
2247 | * @param session ... | 737 | * @param other_peer the other peer |
738 | * @param context_msg message with application specific information from | ||
739 | * the other peer | ||
740 | * @param request request from the other peer, use GNUNET_SET_accept | ||
741 | * to accept it, otherwise the request will be refused | ||
742 | * Note that we don't use a return value here, as it is also | ||
743 | * necessary to specify the set we want to do the operation with, | ||
744 | * whith sometimes can be derived from the context message. | ||
745 | * Also necessary to specify the timeout. | ||
2248 | */ | 746 | */ |
2249 | static void | 747 | static void |
2250 | add_incoming_peers (struct ConsensusSession *session) | 748 | set_listen_cb (void *cls, |
749 | const struct GNUNET_PeerIdentity *other_peer, | ||
750 | const struct GNUNET_MessageHeader *context_msg, | ||
751 | struct GNUNET_SET_Request *request) | ||
2251 | { | 752 | { |
2252 | struct IncomingSocket *inc; | 753 | /* FIXME */ |
2253 | int i; | ||
2254 | struct ConsensusPeerInformation *cpi; | ||
2255 | |||
2256 | for (inc = incoming_sockets_head; NULL != inc; inc = inc->next) | ||
2257 | { | ||
2258 | if ( (NULL == inc->requested_gid) || | ||
2259 | (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) ) | ||
2260 | continue; | ||
2261 | for (i = 0; i < session->num_peers; i++) | ||
2262 | { | ||
2263 | cpi = &session->info[i]; | ||
2264 | cpi->peer_id = inc->peer_id; | ||
2265 | cpi->mss = inc->mss; | ||
2266 | cpi->hello = GNUNET_YES; | ||
2267 | inc->cpi = cpi; | ||
2268 | break; | ||
2269 | } | ||
2270 | } | ||
2271 | } | 754 | } |
2272 | 755 | ||
2273 | 756 | ||
@@ -2277,46 +760,59 @@ add_incoming_peers (struct ConsensusSession *session) | |||
2277 | * @param session the session to initialize | 760 | * @param session the session to initialize |
2278 | */ | 761 | */ |
2279 | static void | 762 | static void |
2280 | initialize_session (struct ConsensusSession *session) | 763 | initialize_session (struct ConsensusSession *session, |
764 | struct GNUNET_CONSENSUS_JoinMessage *join_msg) | ||
2281 | { | 765 | { |
2282 | struct ConsensusSession *other_session; | 766 | struct ConsensusSession *other_session; |
2283 | 767 | ||
2284 | GNUNET_assert (NULL != session->join_msg); | 768 | initialize_session_peer_list (session, join_msg); |
2285 | initialize_session_peer_list (session); | ||
2286 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); | 769 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); |
2287 | compute_global_id (session, &session->join_msg->session_id); | 770 | compute_global_id (session, &join_msg->session_id); |
2288 | 771 | ||
2289 | /* Check if some local client already owns the session. */ | 772 | /* check if some local client already owns the session. */ |
2290 | other_session = sessions_head; | 773 | other_session = sessions_head; |
2291 | while (NULL != other_session) | 774 | while (NULL != other_session) |
2292 | { | 775 | { |
2293 | if ((other_session != session) && | 776 | if ((other_session != session) && |
2294 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) | 777 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) |
2295 | { | 778 | { |
2296 | if (GNUNET_NO == other_session->conclude) | 779 | if (CONSENSUS_ROUND_FINISH != other_session->current_round) |
2297 | { | 780 | { |
2298 | GNUNET_break (0); | 781 | GNUNET_break (0); |
2299 | destroy_session (session); | 782 | destroy_session (session); |
2300 | return; | 783 | return; |
2301 | } | 784 | } |
2302 | GNUNET_SERVER_client_drop (other_session->scss.client); | ||
2303 | other_session->scss.client = NULL; | ||
2304 | break; | 785 | break; |
2305 | } | 786 | } |
2306 | other_session = other_session->next; | 787 | other_session = other_session->next; |
2307 | } | 788 | } |
2308 | 789 | ||
2309 | session->local_peer_idx = get_peer_idx (my_peer, session); | 790 | session->local_peer_idx = get_peer_idx (&my_peer, session); |
2310 | GNUNET_assert (-1 != session->local_peer_idx); | 791 | GNUNET_assert (-1 != session->local_peer_idx); |
792 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, | ||
793 | &session->global_id, | ||
794 | set_listen_cb, session); | ||
2311 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); | 795 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); |
2312 | GNUNET_free (session->join_msg); | ||
2313 | session->join_msg = NULL; | ||
2314 | add_incoming_peers (session); | ||
2315 | GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK); | ||
2316 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); | 796 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); |
2317 | } | 797 | } |
2318 | 798 | ||
2319 | 799 | ||
800 | static struct ConsensusSession * | ||
801 | get_session_by_client (struct GNUNET_SERVER_Client *client) | ||
802 | { | ||
803 | struct ConsensusSession *session; | ||
804 | |||
805 | session = sessions_head; | ||
806 | while (NULL != session) | ||
807 | { | ||
808 | if (session->client == client) | ||
809 | return session; | ||
810 | session = session->next; | ||
811 | } | ||
812 | return NULL; | ||
813 | } | ||
814 | |||
815 | |||
2320 | /** | 816 | /** |
2321 | * Called when a client wants to join a consensus session. | 817 | * Called when a client wants to join a consensus session. |
2322 | * | 818 | * |
@@ -2331,45 +827,20 @@ client_join (void *cls, | |||
2331 | { | 827 | { |
2332 | struct ConsensusSession *session; | 828 | struct ConsensusSession *session; |
2333 | 829 | ||
2334 | // make sure the client has not already joined a session | 830 | session = get_session_by_client (client); |
2335 | session = sessions_head; | 831 | if (NULL != session) |
2336 | while (NULL != session) | ||
2337 | { | 832 | { |
2338 | if (session->scss.client == client) | 833 | GNUNET_break (0); |
2339 | { | 834 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
2340 | GNUNET_break (0); | 835 | return; |
2341 | disconnect_client (client); | ||
2342 | return; | ||
2343 | } | ||
2344 | session = session->next; | ||
2345 | } | 836 | } |
2346 | |||
2347 | session = GNUNET_new (struct ConsensusSession); | 837 | session = GNUNET_new (struct ConsensusSession); |
2348 | session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); | ||
2349 | /* these have to be initialized here, as the client can already start to give us values */ | ||
2350 | session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); | ||
2351 | session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | ||
2352 | session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | ||
2353 | session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); | ||
2354 | session->scss.client = client; | ||
2355 | session->client_mq = create_message_queue_for_server_client (&session->scss); | ||
2356 | GNUNET_SERVER_client_keep (client); | 838 | GNUNET_SERVER_client_keep (client); |
2357 | |||
2358 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | 839 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); |
2359 | 840 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); | |
2360 | // Initialize session later if local peer identity is not known yet. | ||
2361 | if (NULL == my_peer) | ||
2362 | { | ||
2363 | GNUNET_SERVER_disable_receive_done_warning (client); | ||
2364 | return; | ||
2365 | } | ||
2366 | |||
2367 | initialize_session (session); | ||
2368 | } | 841 | } |
2369 | 842 | ||
2370 | 843 | ||
2371 | |||
2372 | |||
2373 | /** | 844 | /** |
2374 | * Called when a client performs an insert operation. | 845 | * Called when a client performs an insert operation. |
2375 | * | 846 | * |
@@ -2379,38 +850,48 @@ client_join (void *cls, | |||
2379 | */ | 850 | */ |
2380 | void | 851 | void |
2381 | client_insert (void *cls, | 852 | client_insert (void *cls, |
2382 | struct GNUNET_SERVER_Client *client, | 853 | struct GNUNET_SERVER_Client *client, |
2383 | const struct GNUNET_MessageHeader *m) | 854 | const struct GNUNET_MessageHeader *m) |
2384 | { | 855 | { |
2385 | struct ConsensusSession *session; | 856 | struct ConsensusSession *session; |
2386 | struct GNUNET_CONSENSUS_ElementMessage *msg; | 857 | struct GNUNET_CONSENSUS_ElementMessage *msg; |
2387 | struct GNUNET_CONSENSUS_Element *element; | 858 | struct GNUNET_SET_Element *element; |
2388 | int element_size; | 859 | ssize_t element_size; |
2389 | 860 | ||
2390 | session = sessions_head; | 861 | session = sessions_head; |
2391 | while (NULL != session) | 862 | while (NULL != session) |
2392 | { | 863 | { |
2393 | if (session->scss.client == client) | 864 | if (session->client == client) |
2394 | break; | 865 | break; |
2395 | } | 866 | } |
2396 | 867 | ||
2397 | if (NULL == session) | 868 | if (NULL == session) |
2398 | { | 869 | { |
2399 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); | 870 | GNUNET_break (0); |
871 | GNUNET_SERVER_client_disconnect (client); | ||
872 | return; | ||
873 | } | ||
874 | |||
875 | if (CONSENSUS_ROUND_BEGIN != session->current_round) | ||
876 | { | ||
877 | GNUNET_break (0); | ||
2400 | GNUNET_SERVER_client_disconnect (client); | 878 | GNUNET_SERVER_client_disconnect (client); |
2401 | return; | 879 | return; |
2402 | } | 880 | } |
2403 | 881 | ||
2404 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; | 882 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; |
2405 | element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 883 | element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
2406 | element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); | 884 | if (element_size < 0) |
885 | { | ||
886 | GNUNET_break (0); | ||
887 | return; | ||
888 | } | ||
889 | element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); | ||
2407 | element->type = msg->element_type; | 890 | element->type = msg->element_type; |
2408 | element->size = element_size; | 891 | element->size = element_size; |
2409 | memcpy (&element[1], &msg[1], element_size); | 892 | memcpy (&element[1], &msg[1], element_size); |
2410 | element->data = &element[1]; | 893 | element->data = &element[1]; |
2411 | GNUNET_assert (NULL != element->data); | 894 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); |
2412 | insert_element (session, element); | ||
2413 | |||
2414 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 895 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2415 | } | 896 | } |
2416 | 897 | ||
@@ -2432,9 +913,8 @@ client_conclude (void *cls, | |||
2432 | 913 | ||
2433 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; | 914 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; |
2434 | 915 | ||
2435 | session = sessions_head; | 916 | session = get_session_by_client (client); |
2436 | while ((session != NULL) && (session->scss.client != client)) | 917 | |
2437 | session = session->next; | ||
2438 | if (NULL == session) | 918 | if (NULL == session) |
2439 | { | 919 | { |
2440 | /* client not found */ | 920 | /* client not found */ |
@@ -2447,16 +927,12 @@ client_conclude (void *cls, | |||
2447 | { | 927 | { |
2448 | /* client requested conclude twice */ | 928 | /* client requested conclude twice */ |
2449 | GNUNET_break (0); | 929 | GNUNET_break (0); |
2450 | /* client may still own a session, destroy it */ | ||
2451 | disconnect_client (client); | ||
2452 | return; | 930 | return; |
2453 | } | 931 | } |
2454 | 932 | ||
2455 | session->conclude = GNUNET_YES; | ||
2456 | |||
2457 | if (session->num_peers <= 1) | 933 | if (session->num_peers <= 1) |
2458 | { | 934 | { |
2459 | send_client_conclude_done (session); | 935 | //send_client_conclude_done (session); |
2460 | } | 936 | } |
2461 | else | 937 | else |
2462 | { | 938 | { |
@@ -2465,48 +941,12 @@ client_conclude (void *cls, | |||
2465 | round_over (session, NULL); | 941 | round_over (session, NULL); |
2466 | } | 942 | } |
2467 | 943 | ||
944 | GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round); | ||
2468 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 945 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2469 | } | 946 | } |
2470 | 947 | ||
2471 | 948 | ||
2472 | /** | 949 | /** |
2473 | * Task that disconnects from core. | ||
2474 | * | ||
2475 | * @param cls core handle | ||
2476 | * @param tc context information (why was this task triggered now) | ||
2477 | */ | ||
2478 | static void | ||
2479 | disconnect_core (void *cls, | ||
2480 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
2481 | { | ||
2482 | if (core != NULL) | ||
2483 | { | ||
2484 | GNUNET_CORE_disconnect (core); | ||
2485 | core = NULL; | ||
2486 | } | ||
2487 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); | ||
2488 | } | ||
2489 | |||
2490 | |||
2491 | static void | ||
2492 | core_startup (void *cls, | ||
2493 | struct GNUNET_CORE_Handle *core, | ||
2494 | const struct GNUNET_PeerIdentity *peer) | ||
2495 | { | ||
2496 | struct ConsensusSession *session; | ||
2497 | |||
2498 | my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); | ||
2499 | /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ | ||
2500 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); | ||
2501 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); | ||
2502 | /* initialize sessions that are waiting for the local peer identity */ | ||
2503 | for (session = sessions_head; NULL != session; session = session->next) | ||
2504 | if (NULL != session->join_msg) | ||
2505 | initialize_session (session); | ||
2506 | } | ||
2507 | |||
2508 | |||
2509 | /** | ||
2510 | * Called to clean up, after a shutdown has been requested. | 950 | * Called to clean up, after a shutdown has been requested. |
2511 | * | 951 | * |
2512 | * @param cls closure | 952 | * @param cls closure |
@@ -2516,35 +956,8 @@ static void | |||
2516 | shutdown_task (void *cls, | 956 | shutdown_task (void *cls, |
2517 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 957 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
2518 | { | 958 | { |
2519 | while (NULL != incoming_sockets_head) | ||
2520 | { | ||
2521 | struct IncomingSocket *socket; | ||
2522 | socket = incoming_sockets_head; | ||
2523 | if (NULL == socket->cpi) | ||
2524 | clear_message_stream_state (&socket->mss); | ||
2525 | incoming_sockets_head = incoming_sockets_head->next; | ||
2526 | GNUNET_free (socket); | ||
2527 | } | ||
2528 | |||
2529 | while (NULL != sessions_head) | 959 | while (NULL != sessions_head) |
2530 | { | ||
2531 | struct ConsensusSession *session; | ||
2532 | session = sessions_head->next; | ||
2533 | destroy_session (sessions_head); | 960 | destroy_session (sessions_head); |
2534 | sessions_head = session; | ||
2535 | } | ||
2536 | |||
2537 | if (NULL != core) | ||
2538 | { | ||
2539 | GNUNET_CORE_disconnect (core); | ||
2540 | core = NULL; | ||
2541 | } | ||
2542 | |||
2543 | if (NULL != listener) | ||
2544 | { | ||
2545 | GNUNET_STREAM_listen_close (listener); | ||
2546 | listener = NULL; | ||
2547 | } | ||
2548 | 961 | ||
2549 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); | 962 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); |
2550 | } | 963 | } |
@@ -2560,10 +973,6 @@ shutdown_task (void *cls, | |||
2560 | static void | 973 | static void |
2561 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) | 974 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) |
2562 | { | 975 | { |
2563 | /* core is only used to retrieve the peer identity */ | ||
2564 | static const struct GNUNET_CORE_MessageHandler core_handlers[] = { | ||
2565 | {NULL, 0, 0} | ||
2566 | }; | ||
2567 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | 976 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { |
2568 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | 977 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, |
2569 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | 978 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, |
@@ -2574,21 +983,15 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
2574 | 983 | ||
2575 | cfg = c; | 984 | cfg = c; |
2576 | srv = server; | 985 | srv = server; |
2577 | 986 | if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &my_peer)) | |
987 | { | ||
988 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); | ||
989 | GNUNET_break (0); | ||
990 | GNUNET_SCHEDULER_shutdown (); | ||
991 | return; | ||
992 | } | ||
2578 | GNUNET_SERVER_add_handlers (server, server_handlers); | 993 | GNUNET_SERVER_add_handlers (server, server_handlers); |
2579 | |||
2580 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | 994 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); |
2581 | |||
2582 | listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
2583 | &listen_cb, NULL, | ||
2584 | GNUNET_STREAM_OPTION_END); | ||
2585 | |||
2586 | /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ | ||
2587 | core = GNUNET_CORE_connect (c, NULL, | ||
2588 | &core_startup, NULL, | ||
2589 | NULL, NULL, GNUNET_NO, NULL, | ||
2590 | GNUNET_NO, core_handlers); | ||
2591 | GNUNET_assert (NULL != core); | ||
2592 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | 995 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); |
2593 | } | 996 | } |
2594 | 997 | ||
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c deleted file mode 100644 index 739b97339..000000000 --- a/src/consensus/ibf.c +++ /dev/null | |||
@@ -1,357 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file consensus/ibf.c | ||
23 | * @brief implementation of the invertible bloom filter | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | |||
27 | #include "ibf.h" | ||
28 | |||
29 | /** | ||
30 | * Create a key from a hashcode. | ||
31 | * | ||
32 | * @param hash the hashcode | ||
33 | * @return a key | ||
34 | */ | ||
35 | struct IBF_Key | ||
36 | ibf_key_from_hashcode (const struct GNUNET_HashCode *hash) | ||
37 | { | ||
38 | /* FIXME: endianess */ | ||
39 | return *(struct IBF_Key *) hash; | ||
40 | } | ||
41 | |||
42 | /** | ||
43 | * Create a hashcode from a key, by replicating the key | ||
44 | * until the hascode is filled | ||
45 | * | ||
46 | * @param key the key | ||
47 | * @param dst hashcode to store the result in | ||
48 | */ | ||
49 | void | ||
50 | ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst) | ||
51 | { | ||
52 | struct IBF_Key *p; | ||
53 | unsigned int i; | ||
54 | const unsigned int keys_per_hashcode = sizeof (struct GNUNET_HashCode) / sizeof (struct IBF_Key); | ||
55 | p = (struct IBF_Key *) dst; | ||
56 | for (i = 0; i < keys_per_hashcode; i++) | ||
57 | *p++ = key; | ||
58 | } | ||
59 | |||
60 | |||
61 | /** | ||
62 | * Create an invertible bloom filter. | ||
63 | * | ||
64 | * @param size number of IBF buckets | ||
65 | * @param hash_num number of buckets one element is hashed in | ||
66 | * @return the newly created invertible bloom filter | ||
67 | */ | ||
68 | struct InvertibleBloomFilter * | ||
69 | ibf_create (uint32_t size, uint8_t hash_num) | ||
70 | { | ||
71 | struct InvertibleBloomFilter *ibf; | ||
72 | |||
73 | /* TODO: use malloc_large */ | ||
74 | |||
75 | ibf = GNUNET_malloc (sizeof (struct InvertibleBloomFilter)); | ||
76 | ibf->count = GNUNET_malloc (size * sizeof (uint8_t)); | ||
77 | ibf->key_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode)); | ||
78 | ibf->key_hash_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode)); | ||
79 | ibf->size = size; | ||
80 | ibf->hash_num = hash_num; | ||
81 | |||
82 | return ibf; | ||
83 | } | ||
84 | |||
85 | /** | ||
86 | * Store unique bucket indices for the specified key in dst. | ||
87 | */ | ||
88 | static inline void | ||
89 | ibf_get_indices (const struct InvertibleBloomFilter *ibf, | ||
90 | struct IBF_Key key, int *dst) | ||
91 | { | ||
92 | struct GNUNET_HashCode bucket_indices; | ||
93 | unsigned int filled; | ||
94 | int i; | ||
95 | GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices); | ||
96 | filled = 0; | ||
97 | for (i = 0; filled < ibf->hash_num; i++) | ||
98 | { | ||
99 | unsigned int bucket; | ||
100 | unsigned int j; | ||
101 | if ( (0 != i) && (0 == (i % 16)) ) | ||
102 | GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices); | ||
103 | bucket = bucket_indices.bits[i % 16] % ibf->size; | ||
104 | for (j = 0; j < filled; j++) | ||
105 | if (dst[j] == bucket) | ||
106 | goto try_next; | ||
107 | dst[filled++] = bucket; | ||
108 | try_next: ; | ||
109 | } | ||
110 | } | ||
111 | |||
112 | |||
113 | static void | ||
114 | ibf_insert_into (struct InvertibleBloomFilter *ibf, | ||
115 | struct IBF_Key key, | ||
116 | const int *buckets, int side) | ||
117 | { | ||
118 | int i; | ||
119 | struct GNUNET_HashCode key_hash_sha; | ||
120 | struct IBF_KeyHash key_hash; | ||
121 | GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha); | ||
122 | key_hash.key_hash_val = key_hash_sha.bits[0]; | ||
123 | for (i = 0; i < ibf->hash_num; i++) | ||
124 | { | ||
125 | const int bucket = buckets[i]; | ||
126 | ibf->count[bucket].count_val += side; | ||
127 | ibf->key_sum[bucket].key_val ^= key.key_val; | ||
128 | ibf->key_hash_sum[bucket].key_hash_val ^= key_hash.key_hash_val; | ||
129 | } | ||
130 | } | ||
131 | |||
132 | |||
133 | /** | ||
134 | * Insert an element into an IBF. | ||
135 | * | ||
136 | * @param ibf the IBF | ||
137 | * @param key the element's hash code | ||
138 | */ | ||
139 | void | ||
140 | ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) | ||
141 | { | ||
142 | int buckets[ibf->hash_num]; | ||
143 | GNUNET_assert (ibf->hash_num <= ibf->size); | ||
144 | ibf_get_indices (ibf, key, buckets); | ||
145 | ibf_insert_into (ibf, key, buckets, 1); | ||
146 | } | ||
147 | |||
148 | /** | ||
149 | * Test is the IBF is empty, i.e. all counts, keys and key hashes are zero. | ||
150 | */ | ||
151 | static int | ||
152 | ibf_is_empty (struct InvertibleBloomFilter *ibf) | ||
153 | { | ||
154 | int i; | ||
155 | for (i = 0; i < ibf->size; i++) | ||
156 | { | ||
157 | if (0 != ibf->count[i].count_val) | ||
158 | return GNUNET_NO; | ||
159 | if (0 != ibf->key_hash_sum[i].key_hash_val) | ||
160 | return GNUNET_NO; | ||
161 | if (0 != ibf->key_sum[i].key_val) | ||
162 | return GNUNET_NO; | ||
163 | } | ||
164 | return GNUNET_YES; | ||
165 | } | ||
166 | |||
167 | |||
168 | /** | ||
169 | * Decode and remove an element from the IBF, if possible. | ||
170 | * | ||
171 | * @param ibf the invertible bloom filter to decode | ||
172 | * @param ret_side sign of the cell's count where the decoded element came from. | ||
173 | * A negative sign indicates that the element was recovered | ||
174 | * resides in an IBF that was previously subtracted from. | ||
175 | * @param ret_id receives the hash code of the decoded element, if successful | ||
176 | * @return GNUNET_YES if decoding an element was successful, | ||
177 | * GNUNET_NO if the IBF is empty, | ||
178 | * GNUNET_SYSERR if the decoding has failed | ||
179 | */ | ||
180 | int | ||
181 | ibf_decode (struct InvertibleBloomFilter *ibf, | ||
182 | int *ret_side, struct IBF_Key *ret_id) | ||
183 | { | ||
184 | struct IBF_KeyHash hash; | ||
185 | int i; | ||
186 | struct GNUNET_HashCode key_hash_sha; | ||
187 | int buckets[ibf->hash_num]; | ||
188 | |||
189 | GNUNET_assert (NULL != ibf); | ||
190 | |||
191 | for (i = 0; i < ibf->size; i++) | ||
192 | { | ||
193 | int j; | ||
194 | int hit; | ||
195 | |||
196 | /* we can only decode from pure buckets */ | ||
197 | if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val)) | ||
198 | continue; | ||
199 | |||
200 | GNUNET_CRYPTO_hash (&ibf->key_sum[i], sizeof (struct IBF_Key), &key_hash_sha); | ||
201 | hash.key_hash_val = key_hash_sha.bits[0]; | ||
202 | |||
203 | /* test if the hash matches the key */ | ||
204 | if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val) | ||
205 | continue; | ||
206 | |||
207 | /* test if key in bucket hits its own location, | ||
208 | * if not, the key hash was subject to collision */ | ||
209 | hit = GNUNET_NO; | ||
210 | ibf_get_indices (ibf, ibf->key_sum[i], buckets); | ||
211 | for (j = 0; j < ibf->hash_num; j++) | ||
212 | if (buckets[j] == i) | ||
213 | hit = GNUNET_YES; | ||
214 | |||
215 | if (GNUNET_NO == hit) | ||
216 | continue; | ||
217 | |||
218 | if (NULL != ret_side) | ||
219 | *ret_side = ibf->count[i].count_val; | ||
220 | if (NULL != ret_id) | ||
221 | *ret_id = ibf->key_sum[i]; | ||
222 | |||
223 | /* insert on the opposite side, effectively removing the element */ | ||
224 | ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val); | ||
225 | |||
226 | return GNUNET_YES; | ||
227 | } | ||
228 | |||
229 | if (GNUNET_YES == ibf_is_empty (ibf)) | ||
230 | return GNUNET_NO; | ||
231 | return GNUNET_SYSERR; | ||
232 | } | ||
233 | |||
234 | |||
235 | /** | ||
236 | * Write buckets from an ibf to a buffer. | ||
237 | * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. | ||
238 | * | ||
239 | * @param ibf the ibf to write | ||
240 | * @param start with which bucket to start | ||
241 | * @param count how many buckets to write | ||
242 | * @param buf buffer to write the data to | ||
243 | */ | ||
244 | void | ||
245 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf) | ||
246 | { | ||
247 | struct IBF_Key *key_dst; | ||
248 | struct IBF_KeyHash *key_hash_dst; | ||
249 | struct IBF_Count *count_dst; | ||
250 | |||
251 | GNUNET_assert (start + count <= ibf->size); | ||
252 | |||
253 | /* copy keys */ | ||
254 | key_dst = (struct IBF_Key *) buf; | ||
255 | memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst); | ||
256 | key_dst += count; | ||
257 | /* copy key hashes */ | ||
258 | key_hash_dst = (struct IBF_KeyHash *) key_dst; | ||
259 | memcpy (key_hash_dst, ibf->key_hash_sum + start, count * sizeof *key_hash_dst); | ||
260 | key_hash_dst += count; | ||
261 | /* copy counts */ | ||
262 | count_dst = (struct IBF_Count *) key_hash_dst; | ||
263 | memcpy (count_dst, ibf->count + start, count * sizeof *count_dst); | ||
264 | count_dst += count; | ||
265 | } | ||
266 | |||
267 | |||
268 | /** | ||
269 | * Read buckets from a buffer into an ibf. | ||
270 | * | ||
271 | * @param buf pointer to the buffer to read from | ||
272 | * @param start which bucket to start at | ||
273 | * @param count how many buckets to read | ||
274 | * @param ibf the ibf to read from | ||
275 | */ | ||
276 | void | ||
277 | ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) | ||
278 | { | ||
279 | struct IBF_Key *key_src; | ||
280 | struct IBF_KeyHash *key_hash_src; | ||
281 | struct IBF_Count *count_src; | ||
282 | |||
283 | GNUNET_assert (start + count <= ibf->size); | ||
284 | |||
285 | /* copy keys */ | ||
286 | key_src = (struct IBF_Key *) buf; | ||
287 | memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src); | ||
288 | key_src += count; | ||
289 | /* copy key hashes */ | ||
290 | key_hash_src = (struct IBF_KeyHash *) key_src; | ||
291 | memcpy (ibf->key_hash_sum + start, key_hash_src, count * sizeof *key_hash_src); | ||
292 | key_hash_src += count; | ||
293 | /* copy counts */ | ||
294 | count_src = (struct IBF_Count *) key_hash_src; | ||
295 | memcpy (ibf->count + start, count_src, count * sizeof *count_src); | ||
296 | count_src += count; | ||
297 | } | ||
298 | |||
299 | |||
300 | /** | ||
301 | * Subtract ibf2 from ibf1, storing the result in ibf1. | ||
302 | * The two IBF's must have the same parameters size and hash_num. | ||
303 | * | ||
304 | * @param ibf1 IBF that is subtracted from | ||
305 | * @param ibf2 IBF that will be subtracted from ibf1 | ||
306 | */ | ||
307 | void | ||
308 | ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2) | ||
309 | { | ||
310 | int i; | ||
311 | |||
312 | GNUNET_assert (ibf1->size == ibf2->size); | ||
313 | GNUNET_assert (ibf1->hash_num == ibf2->hash_num); | ||
314 | |||
315 | for (i = 0; i < ibf1->size; i++) | ||
316 | { | ||
317 | ibf1->count[i].count_val -= ibf2->count[i].count_val; | ||
318 | ibf1->key_hash_sum[i].key_hash_val ^= ibf2->key_hash_sum[i].key_hash_val; | ||
319 | ibf1->key_sum[i].key_val ^= ibf2->key_sum[i].key_val; | ||
320 | } | ||
321 | } | ||
322 | |||
323 | |||
324 | /** | ||
325 | * Create a copy of an IBF, the copy has to be destroyed properly. | ||
326 | * | ||
327 | * @param ibf the IBF to copy | ||
328 | */ | ||
329 | struct InvertibleBloomFilter * | ||
330 | ibf_dup (const struct InvertibleBloomFilter *ibf) | ||
331 | { | ||
332 | struct InvertibleBloomFilter *copy; | ||
333 | copy = GNUNET_malloc (sizeof *copy); | ||
334 | copy->hash_num = ibf->hash_num; | ||
335 | copy->size = ibf->size; | ||
336 | copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash)); | ||
337 | copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key)); | ||
338 | copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (struct IBF_Count)); | ||
339 | return copy; | ||
340 | } | ||
341 | |||
342 | |||
343 | /** | ||
344 | * Destroy all resources associated with the invertible bloom filter. | ||
345 | * No more ibf_*-functions may be called on ibf after calling destroy. | ||
346 | * | ||
347 | * @param ibf the intertible bloom filter to destroy | ||
348 | */ | ||
349 | void | ||
350 | ibf_destroy (struct InvertibleBloomFilter *ibf) | ||
351 | { | ||
352 | GNUNET_free (ibf->key_sum); | ||
353 | GNUNET_free (ibf->key_hash_sum); | ||
354 | GNUNET_free (ibf->count); | ||
355 | GNUNET_free (ibf); | ||
356 | } | ||
357 | |||
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h deleted file mode 100644 index 2bf3ef7c7..000000000 --- a/src/consensus/ibf.h +++ /dev/null | |||
@@ -1,224 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file consensus/ibf.h | ||
23 | * @brief invertible bloom filter | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | |||
27 | #ifndef GNUNET_CONSENSUS_IBF_H | ||
28 | #define GNUNET_CONSENSUS_IBF_H | ||
29 | |||
30 | #include "platform.h" | ||
31 | #include "gnunet_common.h" | ||
32 | #include "gnunet_util_lib.h" | ||
33 | |||
34 | #ifdef __cplusplus | ||
35 | extern "C" | ||
36 | { | ||
37 | #if 0 /* keep Emacsens' auto-indent happy */ | ||
38 | } | ||
39 | #endif | ||
40 | #endif | ||
41 | |||
42 | |||
43 | struct IBF_Key | ||
44 | { | ||
45 | uint64_t key_val; | ||
46 | }; | ||
47 | |||
48 | struct IBF_KeyHash | ||
49 | { | ||
50 | uint32_t key_hash_val; | ||
51 | }; | ||
52 | |||
53 | struct IBF_Count | ||
54 | { | ||
55 | int8_t count_val; | ||
56 | }; | ||
57 | |||
58 | /** | ||
59 | * Size of one ibf bucket in bytes | ||
60 | */ | ||
61 | #define IBF_BUCKET_SIZE (sizeof (struct IBF_Count) + sizeof (struct IBF_Key) + \ | ||
62 | sizeof (struct IBF_KeyHash)) | ||
63 | |||
64 | /** | ||
65 | * Invertible bloom filter (IBF). | ||
66 | * | ||
67 | * An IBF is a counting bloom filter that has the ability to restore | ||
68 | * the hashes of its stored elements with high probability. | ||
69 | */ | ||
70 | struct InvertibleBloomFilter | ||
71 | { | ||
72 | /** | ||
73 | * How many cells does this IBF have? | ||
74 | */ | ||
75 | uint32_t size; | ||
76 | |||
77 | /** | ||
78 | * In how many cells do we hash one element? | ||
79 | * Usually 4 or 3. | ||
80 | */ | ||
81 | uint8_t hash_num; | ||
82 | |||
83 | /** | ||
84 | * Xor sums of the elements' keys, used to identify the elements. | ||
85 | * Array of 'size' elements. | ||
86 | */ | ||
87 | struct IBF_Key *key_sum; | ||
88 | |||
89 | /** | ||
90 | * Xor sums of the hashes of the keys of inserted elements. | ||
91 | * Array of 'size' elements. | ||
92 | */ | ||
93 | struct IBF_KeyHash *key_hash_sum; | ||
94 | |||
95 | /** | ||
96 | * How many times has a bucket been hit? | ||
97 | * Can be negative, as a result of IBF subtraction. | ||
98 | * Array of 'size' elements. | ||
99 | */ | ||
100 | struct IBF_Count *count; | ||
101 | }; | ||
102 | |||
103 | |||
104 | /** | ||
105 | * Write buckets from an ibf to a buffer. | ||
106 | * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. | ||
107 | * | ||
108 | * @param ibf the ibf to write | ||
109 | * @param start with which bucket to start | ||
110 | * @param count how many buckets to write | ||
111 | * @param buf buffer to write the data to | ||
112 | */ | ||
113 | void | ||
114 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf); | ||
115 | |||
116 | |||
117 | /** | ||
118 | * Read buckets from a buffer into an ibf. | ||
119 | * | ||
120 | * @param buf pointer to the buffer to read from | ||
121 | * @param start which bucket to start at | ||
122 | * @param count how many buckets to read | ||
123 | * @param ibf the ibf to read from | ||
124 | */ | ||
125 | void | ||
126 | ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf); | ||
127 | |||
128 | |||
129 | /** | ||
130 | * Create a key from a hashcode. | ||
131 | * | ||
132 | * @param hash the hashcode | ||
133 | * @return a key | ||
134 | */ | ||
135 | struct IBF_Key | ||
136 | ibf_key_from_hashcode (const struct GNUNET_HashCode *hash); | ||
137 | |||
138 | |||
139 | /** | ||
140 | * Create a hashcode from a key, by replicating the key | ||
141 | * until the hascode is filled | ||
142 | * | ||
143 | * @param key the key | ||
144 | * @param dst hashcode to store the result in | ||
145 | */ | ||
146 | void | ||
147 | ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst); | ||
148 | |||
149 | |||
150 | /** | ||
151 | * Create an invertible bloom filter. | ||
152 | * | ||
153 | * @param size number of IBF buckets | ||
154 | * @param hash_num number of buckets one element is hashed in, usually 3 or 4 | ||
155 | * @return the newly created invertible bloom filter | ||
156 | */ | ||
157 | struct InvertibleBloomFilter * | ||
158 | ibf_create (uint32_t size, uint8_t hash_num); | ||
159 | |||
160 | |||
161 | /** | ||
162 | * Insert an element into an IBF. | ||
163 | * | ||
164 | * @param ibf the IBF | ||
165 | * @param key the element's hash code | ||
166 | */ | ||
167 | void | ||
168 | ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key); | ||
169 | |||
170 | |||
171 | /** | ||
172 | * Subtract ibf2 from ibf1, storing the result in ibf1. | ||
173 | * The two IBF's must have the same parameters size and hash_num. | ||
174 | * | ||
175 | * @param ibf1 IBF that is subtracted from | ||
176 | * @param ibf2 IBF that will be subtracted from ibf1 | ||
177 | */ | ||
178 | void | ||
179 | ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2); | ||
180 | |||
181 | |||
182 | /** | ||
183 | * Decode and remove an element from the IBF, if possible. | ||
184 | * | ||
185 | * @param ibf the invertible bloom filter to decode | ||
186 | * @param ret_side sign of the cell's count where the decoded element came from. | ||
187 | * A negative sign indicates that the element was recovered | ||
188 | * resides in an IBF that was previously subtracted from. | ||
189 | * @param ret_id receives the hash code of the decoded element, if successful | ||
190 | * @return GNUNET_YES if decoding an element was successful, | ||
191 | * GNUNET_NO if the IBF is empty, | ||
192 | * GNUNET_SYSERR if the decoding has failed | ||
193 | */ | ||
194 | int | ||
195 | ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id); | ||
196 | |||
197 | |||
198 | /** | ||
199 | * Create a copy of an IBF, the copy has to be destroyed properly. | ||
200 | * | ||
201 | * @param ibf the IBF to copy | ||
202 | */ | ||
203 | struct InvertibleBloomFilter * | ||
204 | ibf_dup (const struct InvertibleBloomFilter *ibf); | ||
205 | |||
206 | /** | ||
207 | * Destroy all resources associated with the invertible bloom filter. | ||
208 | * No more ibf_*-functions may be called on ibf after calling destroy. | ||
209 | * | ||
210 | * @param ibf the intertible bloom filter to destroy | ||
211 | */ | ||
212 | void | ||
213 | ibf_destroy (struct InvertibleBloomFilter *ibf); | ||
214 | |||
215 | |||
216 | #if 0 /* keep Emacsens' auto-indent happy */ | ||
217 | { | ||
218 | #endif | ||
219 | #ifdef __cplusplus | ||
220 | } | ||
221 | #endif | ||
222 | |||
223 | #endif | ||
224 | |||
diff --git a/src/consensus/strata_estimator.c b/src/consensus/strata_estimator.c deleted file mode 100644 index 685c50f0f..000000000 --- a/src/consensus/strata_estimator.c +++ /dev/null | |||
@@ -1,145 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file consensus/ibf.h | ||
23 | * @brief invertible bloom filter | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | |||
27 | #include "platform.h" | ||
28 | #include "gnunet_common.h" | ||
29 | #include "ibf.h" | ||
30 | #include "strata_estimator.h" | ||
31 | |||
32 | void | ||
33 | strata_estimator_write (const struct StrataEstimator *se, void *buf) | ||
34 | { | ||
35 | int i; | ||
36 | for (i = 0; i < se->strata_count; i++) | ||
37 | { | ||
38 | ibf_write_slice (se->strata[i], 0, se->ibf_size, buf); | ||
39 | buf += se->ibf_size * IBF_BUCKET_SIZE; | ||
40 | } | ||
41 | } | ||
42 | |||
43 | void | ||
44 | strata_estimator_read (const void *buf, struct StrataEstimator *se) | ||
45 | { | ||
46 | int i; | ||
47 | for (i = 0; i < se->strata_count; i++) | ||
48 | { | ||
49 | ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); | ||
50 | buf += se->ibf_size * IBF_BUCKET_SIZE; | ||
51 | } | ||
52 | } | ||
53 | |||
54 | |||
55 | void | ||
56 | strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) | ||
57 | { | ||
58 | uint32_t v; | ||
59 | int i; | ||
60 | v = key->bits[0]; | ||
61 | /* count trailing '1'-bits of v */ | ||
62 | for (i = 0; v & 1; v>>=1, i++) | ||
63 | /* empty */; | ||
64 | ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); | ||
65 | } | ||
66 | |||
67 | |||
68 | |||
69 | struct StrataEstimator * | ||
70 | strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum) | ||
71 | { | ||
72 | struct StrataEstimator *se; | ||
73 | int i; | ||
74 | |||
75 | /* fixme: allocate everything in one chunk */ | ||
76 | |||
77 | se = GNUNET_malloc (sizeof (struct StrataEstimator)); | ||
78 | se->strata_count = strata_count; | ||
79 | se->ibf_size = ibf_size; | ||
80 | se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter *) * strata_count); | ||
81 | for (i = 0; i < strata_count; i++) | ||
82 | se->strata[i] = ibf_create (ibf_size, ibf_hashnum); | ||
83 | return se; | ||
84 | } | ||
85 | |||
86 | |||
87 | /** | ||
88 | * Estimate set difference with two strata estimators, | ||
89 | * i.e. arrays of IBFs. | ||
90 | * Does not not modify its arguments. | ||
91 | * | ||
92 | * @param se1 first strata estimator | ||
93 | * @param se2 second strata estimator | ||
94 | * @return the estimated difference | ||
95 | */ | ||
96 | unsigned int | ||
97 | strata_estimator_difference (const struct StrataEstimator *se1, | ||
98 | const struct StrataEstimator *se2) | ||
99 | { | ||
100 | int i; | ||
101 | int count; | ||
102 | |||
103 | GNUNET_assert (se1->strata_count == se2->strata_count); | ||
104 | count = 0; | ||
105 | for (i = se1->strata_count - 1; i >= 0; i--) | ||
106 | { | ||
107 | struct InvertibleBloomFilter *diff; | ||
108 | /* number of keys decoded from the ibf */ | ||
109 | int ibf_count; | ||
110 | int more; | ||
111 | ibf_count = 0; | ||
112 | /* FIXME: implement this without always allocating new IBFs */ | ||
113 | diff = ibf_dup (se1->strata[i]); | ||
114 | ibf_subtract (diff, se2->strata[i]); | ||
115 | for (;;) | ||
116 | { | ||
117 | more = ibf_decode (diff, NULL, NULL); | ||
118 | if (GNUNET_NO == more) | ||
119 | { | ||
120 | count += ibf_count; | ||
121 | break; | ||
122 | } | ||
123 | if (GNUNET_SYSERR == more) | ||
124 | { | ||
125 | ibf_destroy (diff); | ||
126 | return count * (1 << (i + 1)); | ||
127 | } | ||
128 | ibf_count++; | ||
129 | } | ||
130 | ibf_destroy (diff); | ||
131 | } | ||
132 | return count; | ||
133 | } | ||
134 | |||
135 | |||
136 | void | ||
137 | strata_estimator_destroy (struct StrataEstimator *se) | ||
138 | { | ||
139 | int i; | ||
140 | for (i = 0; i < se->strata_count; i++) | ||
141 | ibf_destroy (se->strata[i]); | ||
142 | GNUNET_free (se->strata); | ||
143 | GNUNET_free (se); | ||
144 | } | ||
145 | |||
diff --git a/src/consensus/strata_estimator.h b/src/consensus/strata_estimator.h deleted file mode 100644 index cb5bd3d0a..000000000 --- a/src/consensus/strata_estimator.h +++ /dev/null | |||
@@ -1,84 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file consensus/strata_estimator.h | ||
23 | * @brief estimator of set difference | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | |||
27 | #ifndef GNUNET_CONSENSUS_STRATA_ESTIMATOR_H | ||
28 | #define GNUNET_CONSENSUS_STRATA_ESTIMATOR_H | ||
29 | |||
30 | #include "platform.h" | ||
31 | #include "gnunet_common.h" | ||
32 | #include "gnunet_util_lib.h" | ||
33 | |||
34 | #ifdef __cplusplus | ||
35 | extern "C" | ||
36 | { | ||
37 | #if 0 /* keep Emacsens' auto-indent happy */ | ||
38 | } | ||
39 | #endif | ||
40 | #endif | ||
41 | |||
42 | |||
43 | struct StrataEstimator | ||
44 | { | ||
45 | struct InvertibleBloomFilter **strata; | ||
46 | unsigned int strata_count; | ||
47 | unsigned int ibf_size; | ||
48 | }; | ||
49 | |||
50 | |||
51 | void | ||
52 | strata_estimator_write (const struct StrataEstimator *se, void *buf); | ||
53 | |||
54 | |||
55 | void | ||
56 | strata_estimator_read (const void *buf, struct StrataEstimator *se); | ||
57 | |||
58 | |||
59 | struct StrataEstimator * | ||
60 | strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum); | ||
61 | |||
62 | |||
63 | unsigned int | ||
64 | strata_estimator_difference (const struct StrataEstimator *se1, | ||
65 | const struct StrataEstimator *se2); | ||
66 | |||
67 | |||
68 | void | ||
69 | strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key); | ||
70 | |||
71 | |||
72 | void | ||
73 | strata_estimator_destroy (struct StrataEstimator *se); | ||
74 | |||
75 | |||
76 | #if 0 /* keep Emacsens' auto-indent happy */ | ||
77 | { | ||
78 | #endif | ||
79 | #ifdef __cplusplus | ||
80 | } | ||
81 | #endif | ||
82 | |||
83 | #endif | ||
84 | |||
diff --git a/src/set/mq.h b/src/include/gnunet_mq_lib.h index b7a89f6e0..59b692cf0 100644 --- a/src/set/mq.h +++ b/src/include/gnunet_mq_lib.h | |||
@@ -23,15 +23,10 @@ | |||
23 | * @file set/mq.h | 23 | * @file set/mq.h |
24 | * @brief general purpose request queue | 24 | * @brief general purpose request queue |
25 | */ | 25 | */ |
26 | #ifndef MQ_H | 26 | #ifndef GNUNET_MQ_H |
27 | #define MQ_H | 27 | #define GNUNET_MQ_H |
28 | 28 | ||
29 | #include "platform.h" | ||
30 | #include "gnunet_common.h" | 29 | #include "gnunet_common.h" |
31 | #include "gnunet_util_lib.h" | ||
32 | #include "gnunet_connection_lib.h" | ||
33 | #include "gnunet_server_lib.h" | ||
34 | #include "gnunet_stream_lib.h" | ||
35 | 30 | ||
36 | 31 | ||
37 | /** | 32 | /** |
@@ -69,12 +64,12 @@ | |||
69 | * @param mqm MQ message to augment with additional data | 64 | * @param mqm MQ message to augment with additional data |
70 | * @param src source buffer for the additional data | 65 | * @param src source buffer for the additional data |
71 | * @param len length of the additional data | 66 | * @param len length of the additional data |
72 | * @return FIXME | 67 | * @return GNUNET_SYSERR if nesting the message failed, |
68 | * GNUNET_OK on success | ||
73 | */ | 69 | */ |
74 | #define GNUNET_MQ_nest(mqm, src, len) GNUNET_MQ_nest_ (&mqm, src, len) | 70 | #define GNUNET_MQ_nest(mqm, src, len) GNUNET_MQ_nest_ (&mqm, src, len) |
75 | 71 | ||
76 | 72 | ||
77 | |||
78 | /** | 73 | /** |
79 | * Append a message to the end of an existing MQ message. | 74 | * Append a message to the end of an existing MQ message. |
80 | * If the operation is successful, mqm is changed to point to the new MQ message, | 75 | * If the operation is successful, mqm is changed to point to the new MQ message, |
@@ -114,15 +109,18 @@ | |||
114 | */ | 109 | */ |
115 | #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} | 110 | #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} |
116 | 111 | ||
117 | /** | 112 | |
118 | * Opaque handle to a message queue | ||
119 | */ | ||
120 | struct GNUNET_MQ_MessageQueue; | 113 | struct GNUNET_MQ_MessageQueue; |
121 | 114 | ||
122 | /** | 115 | struct GNUNET_MQ_Message; |
123 | * Opaque handle to an allocated message | 116 | |
124 | */ | 117 | enum GNUNET_MQ_Error |
125 | struct GNUNET_MQ_Message; // Entry (/ Request) | 118 | { |
119 | GNUNET_MQ_ERROR_READ = 1, | ||
120 | GNUNET_MQ_ERROR_WRITE = 2, | ||
121 | GNUNET_MQ_ERROR_TIMEOUT = 4 | ||
122 | }; | ||
123 | |||
126 | 124 | ||
127 | /** | 125 | /** |
128 | * Called when a message has been received. | 126 | * Called when a message has been received. |
@@ -134,6 +132,135 @@ typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_Messag | |||
134 | 132 | ||
135 | 133 | ||
136 | /** | 134 | /** |
135 | * Signature of functions implementing the | ||
136 | * sending part of a message queue | ||
137 | * | ||
138 | * @param q the message queue | ||
139 | * @param m the message | ||
140 | */ | ||
141 | typedef void | ||
142 | (*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m); | ||
143 | |||
144 | |||
145 | typedef void | ||
146 | (*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_MessageQueue *q); | ||
147 | |||
148 | |||
149 | /** | ||
150 | * Callback used for notifications | ||
151 | * | ||
152 | * @param cls closure | ||
153 | */ | ||
154 | typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); | ||
155 | |||
156 | |||
157 | typedef void (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); | ||
158 | |||
159 | |||
160 | struct GNUNET_MQ_Message | ||
161 | { | ||
162 | /** | ||
163 | * Messages are stored in a linked list | ||
164 | */ | ||
165 | struct GNUNET_MQ_Message *next; | ||
166 | |||
167 | /** | ||
168 | * Messages are stored in a linked list | ||
169 | */ | ||
170 | struct GNUNET_MQ_Message *prev; | ||
171 | |||
172 | /** | ||
173 | * Actual allocated message header, | ||
174 | * usually points to the end of the containing GNUNET_MQ_Message | ||
175 | */ | ||
176 | struct GNUNET_MessageHeader *mh; | ||
177 | |||
178 | /** | ||
179 | * Queue the message is queued in, NULL if message is not queued. | ||
180 | */ | ||
181 | struct GNUNET_MQ_MessageQueue *parent_queue; | ||
182 | |||
183 | /** | ||
184 | * Called after the message was sent irrevokably | ||
185 | */ | ||
186 | GNUNET_MQ_NotifyCallback sent_cb; | ||
187 | |||
188 | /** | ||
189 | * Closure for send_cb | ||
190 | */ | ||
191 | void *sent_cls; | ||
192 | }; | ||
193 | |||
194 | |||
195 | /** | ||
196 | * Handle to a message queue. | ||
197 | */ | ||
198 | struct GNUNET_MQ_MessageQueue | ||
199 | { | ||
200 | /** | ||
201 | * Handlers array, or NULL if the queue should not receive messages | ||
202 | */ | ||
203 | const struct GNUNET_MQ_Handler *handlers; | ||
204 | |||
205 | /** | ||
206 | * Closure for the handler callbacks, | ||
207 | * as well as for the error handler. | ||
208 | */ | ||
209 | void *handlers_cls; | ||
210 | |||
211 | /** | ||
212 | * Actual implementation of message sending, | ||
213 | * called when a message is added | ||
214 | */ | ||
215 | GNUNET_MQ_SendImpl send_impl; | ||
216 | |||
217 | /** | ||
218 | * Implementation-dependent queue destruction function | ||
219 | */ | ||
220 | GNUNET_MQ_DestroyImpl destroy_impl; | ||
221 | |||
222 | /** | ||
223 | * Implementation-specific state | ||
224 | */ | ||
225 | void *impl_state; | ||
226 | |||
227 | /** | ||
228 | * Callback will be called when an error occurs. | ||
229 | */ | ||
230 | GNUNET_MQ_ErrorHandler error_handler; | ||
231 | |||
232 | /** | ||
233 | * Linked list of messages pending to be sent | ||
234 | */ | ||
235 | struct GNUNET_MQ_Message *msg_head; | ||
236 | |||
237 | /** | ||
238 | * Linked list of messages pending to be sent | ||
239 | */ | ||
240 | struct GNUNET_MQ_Message *msg_tail; | ||
241 | |||
242 | /** | ||
243 | * Message that is currently scheduled to be | ||
244 | * sent. Not the head of the message queue, as the implementation | ||
245 | * needs to know if sending has been already scheduled or not. | ||
246 | */ | ||
247 | struct GNUNET_MQ_Message *current_msg; | ||
248 | |||
249 | /** | ||
250 | * Map of associations, lazily allocated | ||
251 | */ | ||
252 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | ||
253 | |||
254 | /** | ||
255 | * Next id that should be used for the assoc_map, | ||
256 | * initialized lazily to a random value together with | ||
257 | * assoc_map | ||
258 | */ | ||
259 | uint32_t assoc_id; | ||
260 | }; | ||
261 | |||
262 | |||
263 | /** | ||
137 | * Message handler for a specific message type. | 264 | * Message handler for a specific message type. |
138 | */ | 265 | */ |
139 | struct GNUNET_MQ_Handler | 266 | struct GNUNET_MQ_Handler |
@@ -159,13 +286,6 @@ struct GNUNET_MQ_Handler | |||
159 | uint16_t expected_size; | 286 | uint16_t expected_size; |
160 | }; | 287 | }; |
161 | 288 | ||
162 | /** | ||
163 | * Callback used for notifications | ||
164 | * | ||
165 | * @param cls closure | ||
166 | */ | ||
167 | typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); | ||
168 | |||
169 | 289 | ||
170 | /** | 290 | /** |
171 | * Create a new message for MQ. | 291 | * Create a new message for MQ. |
@@ -179,6 +299,16 @@ struct GNUNET_MQ_Message * | |||
179 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); | 299 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); |
180 | 300 | ||
181 | 301 | ||
302 | /** | ||
303 | * Resize the the mq message pointed to by mqmp, | ||
304 | * and append the given data to it. | ||
305 | * | ||
306 | * @param mqmp pointer to a mq message pointer | ||
307 | * @param src source of the data to append | ||
308 | * @param len length of the data to append | ||
309 | * @return GNUNET_OK on success, | ||
310 | * GNUNET_SYSERR on error (e.g. if len is too large) | ||
311 | */ | ||
182 | int | 312 | int |
183 | GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, | 313 | GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, |
184 | const void *src, uint16_t len); | 314 | const void *src, uint16_t len); |
@@ -277,19 +407,23 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client); | |||
277 | 407 | ||
278 | 408 | ||
279 | /** | 409 | /** |
280 | * Create a message queue for a GNUNET_STREAM_Socket. | 410 | * Create a message queue for the specified handlers. |
281 | * If handlers are specfied, receive messages from the stream socket. | ||
282 | * | 411 | * |
283 | * @param socket the stream socket | 412 | * @param send function the implements sending messages |
284 | * @param handlers handlers for receiving messages | 413 | * @param destroy function that implements destroying the queue |
285 | * @param cls closure for the handlers | 414 | * @param state for the queue, passed to 'send' and 'destroy' |
286 | * @return the message queue | 415 | * @param handlers array of message handlers |
287 | * @deprecated - GNUNET_MQ_queue_create_with_callbacks | 416 | * @param error_handler handler for read and write errors |
417 | * @return a new message queue | ||
288 | */ | 418 | */ |
289 | struct GNUNET_MQ_MessageQueue * | 419 | struct GNUNET_MQ_MessageQueue * |
290 | GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, | 420 | GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, |
291 | const struct GNUNET_MQ_Handler *handlers, | 421 | GNUNET_MQ_DestroyImpl destroy, |
292 | void *cls); | 422 | void *impl_state, |
423 | struct GNUNET_MQ_Handler *handlers, | ||
424 | GNUNET_MQ_ErrorHandler error_handler, | ||
425 | void *cls); | ||
426 | |||
293 | 427 | ||
294 | 428 | ||
295 | /** | 429 | /** |
@@ -323,40 +457,22 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | |||
323 | 457 | ||
324 | 458 | ||
325 | /** | 459 | /** |
326 | * Call a callback once all messages queued have been sent, | 460 | * Destroy the message queue. |
327 | * i.e. the message queue is empty. | ||
328 | * | ||
329 | * @param mqm the message queue to send the notification for | ||
330 | * @param cb the callback to call on an empty queue | ||
331 | * @param cls closure for cb | ||
332 | * @deprecated | ||
333 | */ | ||
334 | void | ||
335 | GNUNET_MQ_notify_empty (struct GNUNET_MQ_MessageQueue *mqm, | ||
336 | GNUNET_MQ_NotifyCallback cb, | ||
337 | void *cls); | ||
338 | |||
339 | |||
340 | /** | ||
341 | * Call a callback if reading encountered an error. | ||
342 | * | 461 | * |
343 | * @param mqm the message queue to send the notification for | 462 | * @param mq message queue to destroy |
344 | * @param cb the callback to call on a read error | ||
345 | * @param cls closure for cb | ||
346 | * @deprecated, integrate with queue creation | ||
347 | */ | 463 | */ |
348 | void | 464 | void |
349 | GNUNET_MQ_notify_read_error (struct GNUNET_MQ_MessageQueue *mqm, | 465 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); |
350 | GNUNET_MQ_NotifyCallback cb, | ||
351 | void *cls); | ||
352 | 466 | ||
353 | 467 | ||
354 | /** | 468 | /** |
355 | * Destroy the message queue. | 469 | * Call the right callback for a message. |
356 | * | 470 | * |
357 | * @param mq message queue to destroy | 471 | * @param mq message queue with the handlers |
472 | * @param mh message to dispatch | ||
358 | */ | 473 | */ |
359 | void | 474 | void |
360 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); | 475 | GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, |
476 | const struct GNUNET_MessageHeader *mh); | ||
361 | 477 | ||
362 | #endif | 478 | #endif |
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index 72d7ee521..7415e75f3 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h | |||
@@ -35,7 +35,6 @@ extern "C" | |||
35 | #endif | 35 | #endif |
36 | #endif | 36 | #endif |
37 | 37 | ||
38 | #include "platform.h" | ||
39 | #include "gnunet_common.h" | 38 | #include "gnunet_common.h" |
40 | #include "gnunet_time_lib.h" | 39 | #include "gnunet_time_lib.h" |
41 | #include "gnunet_configuration_lib.h" | 40 | #include "gnunet_configuration_lib.h" |
@@ -176,7 +175,7 @@ typedef void (*GNUNET_SET_ResultIterator) (void *cls, | |||
176 | 175 | ||
177 | /** | 176 | /** |
178 | * Called when another peer wants to do a set operation with the | 177 | * Called when another peer wants to do a set operation with the |
179 | * local peer | 178 | * local peer. |
180 | * | 179 | * |
181 | * @param other_peer the other peer | 180 | * @param other_peer the other peer |
182 | * @param context_msg message with application specific information from | 181 | * @param context_msg message with application specific information from |
@@ -266,8 +265,6 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); | |||
266 | * @param salt salt used for the set operation; sometimes set operations | 265 | * @param salt salt used for the set operation; sometimes set operations |
267 | * fail due to hash collisions, using a different salt for each operation | 266 | * fail due to hash collisions, using a different salt for each operation |
268 | * makes it harder for an attacker to exploit this | 267 | * makes it harder for an attacker to exploit this |
269 | * @param timeout result_cb will be called with GNUNET_SET_STATUS_TIMEOUT | ||
270 | * if the operation is not done after the specified time; @deprecated | ||
271 | * @param result_mode specified how results will be returned, | 268 | * @param result_mode specified how results will be returned, |
272 | * see 'GNUNET_SET_ResultMode'. | 269 | * see 'GNUNET_SET_ResultMode'. |
273 | * @param result_cb called on error or success | 270 | * @param result_cb called on error or success |
@@ -280,7 +277,6 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
280 | const struct GNUNET_HashCode *app_id, | 277 | const struct GNUNET_HashCode *app_id, |
281 | const struct GNUNET_MessageHeader *context_msg, | 278 | const struct GNUNET_MessageHeader *context_msg, |
282 | uint16_t salt, | 279 | uint16_t salt, |
283 | struct GNUNET_TIME_Relative timeout, | ||
284 | enum GNUNET_SET_ResultMode result_mode, | 280 | enum GNUNET_SET_ResultMode result_mode, |
285 | GNUNET_SET_ResultIterator result_cb, | 281 | GNUNET_SET_ResultIterator result_cb, |
286 | void *result_cls); | 282 | void *result_cls); |
@@ -322,7 +318,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh); | |||
322 | * | 318 | * |
323 | * @param request request to accept | 319 | * @param request request to accept |
324 | * @param set set used for the requested operation | 320 | * @param set set used for the requested operation |
325 | * @param timeout timeout for the set operation, @deprecated | ||
326 | * @param result_mode specified how results will be returned, | 321 | * @param result_mode specified how results will be returned, |
327 | * see 'GNUNET_SET_ResultMode'. | 322 | * see 'GNUNET_SET_ResultMode'. |
328 | * @param result_cb callback for the results | 323 | * @param result_cb callback for the results |
@@ -332,7 +327,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh); | |||
332 | struct GNUNET_SET_OperationHandle * | 327 | struct GNUNET_SET_OperationHandle * |
333 | GNUNET_SET_accept (struct GNUNET_SET_Request *request, | 328 | GNUNET_SET_accept (struct GNUNET_SET_Request *request, |
334 | struct GNUNET_SET_Handle *set, | 329 | struct GNUNET_SET_Handle *set, |
335 | struct GNUNET_TIME_Relative timeout, | ||
336 | enum GNUNET_SET_ResultMode result_mode, | 330 | enum GNUNET_SET_ResultMode result_mode, |
337 | GNUNET_SET_ResultIterator result_cb, | 331 | GNUNET_SET_ResultIterator result_cb, |
338 | void *cls); | 332 | void *cls); |
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index 056695ba3..ece60c033 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h | |||
@@ -395,6 +395,21 @@ void | |||
395 | GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh); | 395 | GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh); |
396 | 396 | ||
397 | 397 | ||
398 | /** | ||
399 | * Create a message queue for a stream socket. | ||
400 | * | ||
401 | * @param socket the socket to read/write in the message queue | ||
402 | * @param msg_handlers message handler array | ||
403 | * @param error_handler callback for errors | ||
404 | * @return the message queue for the socket | ||
405 | */ | ||
406 | struct GNUNET_MQ_MessageQueue * | ||
407 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, | ||
408 | const struct GNUNET_MQ_Handler *msg_handlers, | ||
409 | GNUNET_MQ_ErrorHandler error_handler, | ||
410 | void *cls); | ||
411 | |||
412 | |||
398 | #if 0 | 413 | #if 0 |
399 | { | 414 | { |
400 | #endif | 415 | #endif |
diff --git a/src/include/gnunet_util_lib.h b/src/include/gnunet_util_lib.h index 30ec7cd3f..439230560 100644 --- a/src/include/gnunet_util_lib.h +++ b/src/include/gnunet_util_lib.h | |||
@@ -47,6 +47,7 @@ extern "C" | |||
47 | #include "gnunet_disk_lib.h" | 47 | #include "gnunet_disk_lib.h" |
48 | #include "gnunet_getopt_lib.h" | 48 | #include "gnunet_getopt_lib.h" |
49 | #include "gnunet_helper_lib.h" | 49 | #include "gnunet_helper_lib.h" |
50 | #include "gnunet_mq_lib.h" | ||
50 | #include "gnunet_network_lib.h" | 51 | #include "gnunet_network_lib.h" |
51 | #include "gnunet_os_lib.h" | 52 | #include "gnunet_os_lib.h" |
52 | #include "gnunet_peer_lib.h" | 53 | #include "gnunet_peer_lib.h" |
diff --git a/src/set/Makefile.am b/src/set/Makefile.am index a609840b1..13278b05c 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am | |||
@@ -16,7 +16,7 @@ if USE_COVERAGE | |||
16 | endif | 16 | endif |
17 | 17 | ||
18 | bin_PROGRAMS = \ | 18 | bin_PROGRAMS = \ |
19 | gnunet-set gnunet-set-bug | 19 | gnunet-set |
20 | 20 | ||
21 | libexec_PROGRAMS = \ | 21 | libexec_PROGRAMS = \ |
22 | gnunet-service-set | 22 | gnunet-service-set |
@@ -35,17 +35,9 @@ gnunet_set_LDADD = \ | |||
35 | gnunet_set_DEPENDENCIES = \ | 35 | gnunet_set_DEPENDENCIES = \ |
36 | libgnunetset.la | 36 | libgnunetset.la |
37 | 37 | ||
38 | gnunet_set_bug_SOURCES = \ | ||
39 | gnunet-set-bug.c | ||
40 | gnunet_set_bug_LDADD = \ | ||
41 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
42 | $(top_builddir)/src/stream/libgnunetstream.la \ | ||
43 | $(GN_LIBINTL) | ||
44 | |||
45 | gnunet_service_set_SOURCES = \ | 38 | gnunet_service_set_SOURCES = \ |
46 | gnunet-service-set.c \ | 39 | gnunet-service-set.c \ |
47 | gnunet-service-set_union.c \ | 40 | gnunet-service-set_union.c \ |
48 | mq.c \ | ||
49 | ibf.c \ | 41 | ibf.c \ |
50 | strata_estimator.c | 42 | strata_estimator.c |
51 | gnunet_service_set_LDADD = \ | 43 | gnunet_service_set_LDADD = \ |
@@ -54,13 +46,9 @@ gnunet_service_set_LDADD = \ | |||
54 | $(top_builddir)/src/stream/libgnunetstream.la \ | 46 | $(top_builddir)/src/stream/libgnunetstream.la \ |
55 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 47 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
56 | $(GN_LIBINTL) | 48 | $(GN_LIBINTL) |
57 | # hack for mq.c, see automake Objects ‘created with both libtool and without’ | ||
58 | # remove once GNUNET_MQ is in util/ | ||
59 | gnunet_service_set_CFLAGS = $(AM_CFLAGS) | ||
60 | 49 | ||
61 | libgnunetset_la_SOURCES = \ | 50 | libgnunetset_la_SOURCES = \ |
62 | set_api.c \ | 51 | set_api.c |
63 | mq.c | ||
64 | libgnunetset_la_LIBADD = \ | 52 | libgnunetset_la_LIBADD = \ |
65 | $(top_builddir)/src/util/libgnunetutil.la \ | 53 | $(top_builddir)/src/util/libgnunetutil.la \ |
66 | $(top_builddir)/src/stream/libgnunetstream.la \ | 54 | $(top_builddir)/src/stream/libgnunetstream.la \ |
@@ -84,24 +72,6 @@ test_set_api_LDADD = \ | |||
84 | test_set_api_DEPENDENCIES = \ | 72 | test_set_api_DEPENDENCIES = \ |
85 | libgnunetset.la | 73 | libgnunetset.la |
86 | 74 | ||
87 | |||
88 | test_mq_SOURCES = \ | ||
89 | test_mq.c \ | ||
90 | mq.c | ||
91 | test_mq_LDADD = \ | ||
92 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
93 | $(top_builddir)/src/stream/libgnunetstream.la | ||
94 | test_mq_CFLAGS = $(AM_CFLAGS) | ||
95 | |||
96 | |||
97 | test_mq_client_SOURCES = \ | ||
98 | test_mq_client.c \ | ||
99 | mq.c | ||
100 | test_mq_client_LDADD = \ | ||
101 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
102 | $(top_builddir)/src/stream/libgnunetstream.la | ||
103 | test_mq_client_CFLAGS = $(AM_CFLAGS) | ||
104 | |||
105 | EXTRA_DIST = \ | 75 | EXTRA_DIST = \ |
106 | test_set.conf | 76 | test_set.conf |
107 | 77 | ||
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 9ac0fbee6..2aea50365 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -632,7 +632,7 @@ stream_listen_cb (void *cls, | |||
632 | incoming = GNUNET_new (struct Incoming); | 632 | incoming = GNUNET_new (struct Incoming); |
633 | incoming->peer = *initiator; | 633 | incoming->peer = *initiator; |
634 | incoming->socket = socket; | 634 | incoming->socket = socket; |
635 | incoming->mq = GNUNET_MQ_queue_for_stream_socket (incoming->socket, handlers, incoming); | 635 | incoming->mq = GNUNET_STREAM_mq_create (incoming->socket, handlers, NULL, incoming); |
636 | /* FIXME: timeout for peers that only connect but don't send anything */ | 636 | /* FIXME: timeout for peers that only connect but don't send anything */ |
637 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); | 637 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); |
638 | return GNUNET_OK; | 638 | return GNUNET_OK; |
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index cc28e9701..bea77416e 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h | |||
@@ -36,7 +36,6 @@ | |||
36 | #include "gnunet_stream_lib.h" | 36 | #include "gnunet_stream_lib.h" |
37 | #include "gnunet_set_service.h" | 37 | #include "gnunet_set_service.h" |
38 | #include "set.h" | 38 | #include "set.h" |
39 | #include "mq.h" | ||
40 | 39 | ||
41 | 40 | ||
42 | /* FIXME: cfuchs */ | 41 | /* FIXME: cfuchs */ |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 05b125047..ae7f47266 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -1053,8 +1053,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1053 | } | 1053 | } |
1054 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); | 1054 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); |
1055 | ee = GNUNET_malloc (sizeof *eo + element_size); | 1055 | ee = GNUNET_malloc (sizeof *eo + element_size); |
1056 | memcpy (&ee[1], &mh[1], element_size); | ||
1056 | ee->element.data = &ee[1]; | 1057 | ee->element.data = &ee[1]; |
1057 | memcpy (ee->element.data, &mh[1], element_size); | ||
1058 | ee->remote = GNUNET_YES; | 1058 | ee->remote = GNUNET_YES; |
1059 | 1059 | ||
1060 | insert_element (eo, ee); | 1060 | insert_element (eo, ee); |
@@ -1183,8 +1183,8 @@ stream_open_cb (void *cls, | |||
1183 | 1183 | ||
1184 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n"); | 1184 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n"); |
1185 | 1185 | ||
1186 | eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket, | 1186 | |
1187 | union_handlers, eo); | 1187 | eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo); |
1188 | /* we started the operation, thus we have to send the operation request */ | 1188 | /* we started the operation, thus we have to send the operation request */ |
1189 | send_operation_request (eo); | 1189 | send_operation_request (eo); |
1190 | eo->phase = PHASE_EXPECT_SE; | 1190 | eo->phase = PHASE_EXPECT_SE; |
@@ -1312,9 +1312,9 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set) | |||
1312 | element_size = ntohs (m->header.size) - sizeof *m; | 1312 | element_size = ntohs (m->header.size) - sizeof *m; |
1313 | ee = GNUNET_malloc (element_size + sizeof *ee); | 1313 | ee = GNUNET_malloc (element_size + sizeof *ee); |
1314 | ee->element.size = element_size; | 1314 | ee->element.size = element_size; |
1315 | memcpy (&ee[1], &m[1], element_size); | ||
1315 | ee->element.data = &ee[1]; | 1316 | ee->element.data = &ee[1]; |
1316 | ee->generation_added = set->state.u->current_generation; | 1317 | ee->generation_added = set->state.u->current_generation; |
1317 | memcpy (ee->element.data, &m[1], element_size); | ||
1318 | GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash); | 1318 | GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash); |
1319 | ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash); | 1319 | ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash); |
1320 | if (NULL != ee_dup) | 1320 | if (NULL != ee_dup) |
diff --git a/src/set/gnunet-set-bug.c b/src/set/gnunet-set-bug.c deleted file mode 100644 index 112def7d7..000000000 --- a/src/set/gnunet-set-bug.c +++ /dev/null | |||
@@ -1,142 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file set/gnunet-set.c | ||
23 | * @brief profiling tool for the set service | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_common.h" | ||
28 | #include "gnunet_applications.h" | ||
29 | #include "gnunet_util_lib.h" | ||
30 | #include "gnunet_stream_lib.h" | ||
31 | |||
32 | |||
33 | static struct GNUNET_PeerIdentity local_id; | ||
34 | |||
35 | static struct GNUNET_CONFIGURATION_Handle *cfg; | ||
36 | |||
37 | static struct GNUNET_STREAM_ListenSocket *listen_socket; | ||
38 | |||
39 | static struct GNUNET_STREAM_Socket *s1; | ||
40 | |||
41 | static struct GNUNET_STREAM_Socket *s2; | ||
42 | |||
43 | static void | ||
44 | do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
45 | { | ||
46 | if (NULL != s2) | ||
47 | GNUNET_STREAM_close (s2); | ||
48 | GNUNET_STREAM_close (s1); | ||
49 | GNUNET_STREAM_listen_close (listen_socket); | ||
50 | GNUNET_CONFIGURATION_destroy (cfg); | ||
51 | } | ||
52 | |||
53 | static size_t | ||
54 | stream_data_processor (void *cls, | ||
55 | enum GNUNET_STREAM_Status status, | ||
56 | const void *data, | ||
57 | size_t size) | ||
58 | { | ||
59 | return size; | ||
60 | } | ||
61 | |||
62 | static int | ||
63 | listen_cb (void *cls, | ||
64 | struct GNUNET_STREAM_Socket *socket, | ||
65 | const struct | ||
66 | GNUNET_PeerIdentity *initiator) | ||
67 | { | ||
68 | if (NULL == (s2 = socket)) | ||
69 | { | ||
70 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "socket listen failed\n"); | ||
71 | return GNUNET_NO; | ||
72 | } | ||
73 | |||
74 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "socket listen succesful\n"); | ||
75 | GNUNET_assert (NULL != socket); | ||
76 | GNUNET_assert (0 == memcmp (initiator, &local_id, sizeof (*initiator))); | ||
77 | GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
78 | &stream_data_processor, NULL); | ||
79 | return GNUNET_YES; | ||
80 | } | ||
81 | |||
82 | static void | ||
83 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | ||
84 | { | ||
85 | |||
86 | } | ||
87 | |||
88 | static void | ||
89 | stream_connect (void) | ||
90 | { | ||
91 | s1 = GNUNET_STREAM_open (cfg, | ||
92 | &local_id, | ||
93 | GNUNET_APPLICATION_TYPE_SET, | ||
94 | &open_cb, | ||
95 | NULL, | ||
96 | GNUNET_STREAM_OPTION_END); | ||
97 | } | ||
98 | |||
99 | /** | ||
100 | * Main function that will be run. | ||
101 | * | ||
102 | * @param cls closure | ||
103 | * @param args remaining command-line arguments | ||
104 | * @param cfgfile name of the configuration file used (for saving, can be NULL!) | ||
105 | * @param cfg configuration | ||
106 | */ | ||
107 | static void | ||
108 | run (void *cls, char *const *args, | ||
109 | const char *cfgfile, | ||
110 | const struct GNUNET_CONFIGURATION_Handle *cfg2) | ||
111 | { | ||
112 | |||
113 | cfg = GNUNET_CONFIGURATION_dup (cfg2); | ||
114 | GNUNET_CRYPTO_get_host_identity (cfg, &local_id); | ||
115 | |||
116 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "I am Peer %s\n", GNUNET_h2s (&local_id.hashPubKey)); | ||
117 | |||
118 | listen_socket = GNUNET_STREAM_listen (cfg, | ||
119 | GNUNET_APPLICATION_TYPE_SET, | ||
120 | &listen_cb, | ||
121 | NULL, | ||
122 | GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS, | ||
123 | &stream_connect, | ||
124 | GNUNET_STREAM_OPTION_END); | ||
125 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, | ||
126 | &do_shutdown, NULL); | ||
127 | } | ||
128 | |||
129 | |||
130 | |||
131 | int | ||
132 | main (int argc, char **argv) | ||
133 | { | ||
134 | static const struct GNUNET_GETOPT_CommandLineOption options[] = { | ||
135 | GNUNET_GETOPT_OPTION_END | ||
136 | }; | ||
137 | GNUNET_PROGRAM_run (argc, argv, "gnunet-set", | ||
138 | "help", | ||
139 | options, &run, NULL); | ||
140 | return 0; | ||
141 | } | ||
142 | |||
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/set/gnunet-set-ibf.c index d431795f1..d431795f1 100644 --- a/src/consensus/gnunet-consensus-ibf.c +++ b/src/set/gnunet-set-ibf.c | |||
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c index d665fce11..5f2d1c976 100644 --- a/src/set/gnunet-set.c +++ b/src/set/gnunet-set.c | |||
@@ -42,8 +42,8 @@ int num_done; | |||
42 | 42 | ||
43 | 43 | ||
44 | static void | 44 | static void |
45 | result_cb_set1 (void *cls, struct GNUNET_SET_Element *element, | 45 | result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element, |
46 | enum GNUNET_SET_Status status) | 46 | enum GNUNET_SET_Status status) |
47 | { | 47 | { |
48 | switch (status) | 48 | switch (status) |
49 | { | 49 | { |
@@ -64,7 +64,7 @@ result_cb_set1 (void *cls, struct GNUNET_SET_Element *element, | |||
64 | 64 | ||
65 | 65 | ||
66 | static void | 66 | static void |
67 | result_cb_set2 (void *cls, struct GNUNET_SET_Element *element, | 67 | result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element, |
68 | enum GNUNET_SET_Status status) | 68 | enum GNUNET_SET_Status status) |
69 | { | 69 | { |
70 | switch (status) | 70 | switch (status) |
@@ -94,7 +94,7 @@ listen_cb (void *cls, | |||
94 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); | 94 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); |
95 | GNUNET_SET_listen_cancel (listen_handle); | 95 | GNUNET_SET_listen_cancel (listen_handle); |
96 | 96 | ||
97 | GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL, | 97 | GNUNET_SET_accept (request, set2, |
98 | GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); | 98 | GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); |
99 | } | 99 | } |
100 | 100 | ||
@@ -110,7 +110,7 @@ start (void *cls) | |||
110 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | 110 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, |
111 | &app_id, listen_cb, NULL); | 111 | &app_id, listen_cb, NULL); |
112 | GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, | 112 | GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, |
113 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED, | 113 | GNUNET_SET_RESULT_ADDED, |
114 | result_cb_set1, NULL); | 114 | result_cb_set1, NULL); |
115 | } | 115 | } |
116 | 116 | ||
diff --git a/src/set/set_api.c b/src/set/set_api.c index 775e390de..5838680b9 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | (C) 2012 Christian Grothoff (and other contributing authors) | 3 | (C) 2012, 2013 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -29,8 +29,6 @@ | |||
29 | #include "gnunet_client_lib.h" | 29 | #include "gnunet_client_lib.h" |
30 | #include "gnunet_set_service.h" | 30 | #include "gnunet_set_service.h" |
31 | #include "set.h" | 31 | #include "set.h" |
32 | #include "mq.h" | ||
33 | #include <inttypes.h> | ||
34 | 32 | ||
35 | 33 | ||
36 | #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) |
@@ -61,7 +59,6 @@ struct GNUNET_SET_OperationHandle | |||
61 | void *result_cls; | 59 | void *result_cls; |
62 | struct GNUNET_SET_Handle *set; | 60 | struct GNUNET_SET_Handle *set; |
63 | uint32_t request_id; | 61 | uint32_t request_id; |
64 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
65 | }; | 62 | }; |
66 | 63 | ||
67 | 64 | ||
@@ -104,11 +101,6 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
104 | * and this is the last result message we get */ | 101 | * and this is the last result message we get */ |
105 | if (htons (msg->result_status) != GNUNET_SET_STATUS_OK) | 102 | if (htons (msg->result_status) != GNUNET_SET_STATUS_OK) |
106 | { | 103 | { |
107 | if (GNUNET_SCHEDULER_NO_TASK != oh->timeout_task) | ||
108 | { | ||
109 | GNUNET_SCHEDULER_cancel (oh->timeout_task); | ||
110 | oh->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
111 | } | ||
112 | GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); | 104 | GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); |
113 | if (NULL != oh->result_cb) | 105 | if (NULL != oh->result_cb) |
114 | oh->result_cb (oh->result_cls, NULL, htons (msg->result_status)); | 106 | oh->result_cb (oh->result_cls, NULL, htons (msg->result_status)); |
@@ -264,26 +256,6 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) | |||
264 | 256 | ||
265 | 257 | ||
266 | /** | 258 | /** |
267 | * Signature of the main function of a task. | ||
268 | * | ||
269 | * @param cls closure | ||
270 | * @param tc context information (why was this task triggered now) | ||
271 | */ | ||
272 | static void | ||
273 | operation_timeout_task (void *cls, | ||
274 | const struct GNUNET_SCHEDULER_TaskContext * tc) | ||
275 | { | ||
276 | struct GNUNET_SET_OperationHandle *oh = cls; | ||
277 | oh->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
278 | if (NULL != oh->result_cb) | ||
279 | oh->result_cb (oh->result_cls, NULL, GNUNET_SET_STATUS_TIMEOUT); | ||
280 | oh->result_cb = NULL; | ||
281 | oh->result_cls = NULL; | ||
282 | GNUNET_SET_operation_cancel (oh); | ||
283 | } | ||
284 | |||
285 | |||
286 | /** | ||
287 | * Evaluate a set operation with our set and the set of another peer. | 259 | * Evaluate a set operation with our set and the set of another peer. |
288 | * | 260 | * |
289 | * @param set set to use | 261 | * @param set set to use |
@@ -294,8 +266,6 @@ operation_timeout_task (void *cls, | |||
294 | * @param salt salt used for the set operation; sometimes set operations | 266 | * @param salt salt used for the set operation; sometimes set operations |
295 | * fail due to hash collisions, using a different salt for each operation | 267 | * fail due to hash collisions, using a different salt for each operation |
296 | * makes it harder for an attacker to exploit this | 268 | * makes it harder for an attacker to exploit this |
297 | * @param timeout result_cb will be called with GNUNET_SET_STATUS_TIMEOUT | ||
298 | * if the operation is not done after the specified time | ||
299 | * @param result_mode specified how results will be returned, | 269 | * @param result_mode specified how results will be returned, |
300 | * see 'GNUNET_SET_ResultMode'. | 270 | * see 'GNUNET_SET_ResultMode'. |
301 | * @param result_cb called on error or success | 271 | * @param result_cb called on error or success |
@@ -308,7 +278,6 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
308 | const struct GNUNET_HashCode *app_id, | 278 | const struct GNUNET_HashCode *app_id, |
309 | const struct GNUNET_MessageHeader *context_msg, | 279 | const struct GNUNET_MessageHeader *context_msg, |
310 | uint16_t salt, | 280 | uint16_t salt, |
311 | struct GNUNET_TIME_Relative timeout, | ||
312 | enum GNUNET_SET_ResultMode result_mode, | 281 | enum GNUNET_SET_ResultMode result_mode, |
313 | GNUNET_SET_ResultIterator result_cb, | 282 | GNUNET_SET_ResultIterator result_cb, |
314 | void *result_cls) | 283 | void *result_cls) |
@@ -331,7 +300,6 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
331 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size))) | 300 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size))) |
332 | GNUNET_assert (0); | 301 | GNUNET_assert (0); |
333 | 302 | ||
334 | oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); | ||
335 | GNUNET_MQ_send (set->mq, mqm); | 303 | GNUNET_MQ_send (set->mq, mqm); |
336 | 304 | ||
337 | return oh; | 305 | return oh; |
@@ -399,7 +367,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) | |||
399 | * | 367 | * |
400 | * @param request request to accept | 368 | * @param request request to accept |
401 | * @param set set used for the requested operation | 369 | * @param set set used for the requested operation |
402 | * @param timeout timeout for the set operation | ||
403 | * @param result_mode specified how results will be returned, | 370 | * @param result_mode specified how results will be returned, |
404 | * see 'GNUNET_SET_ResultMode'. | 371 | * see 'GNUNET_SET_ResultMode'. |
405 | * @param result_cb callback for the results | 372 | * @param result_cb callback for the results |
@@ -409,7 +376,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) | |||
409 | struct GNUNET_SET_OperationHandle * | 376 | struct GNUNET_SET_OperationHandle * |
410 | GNUNET_SET_accept (struct GNUNET_SET_Request *request, | 377 | GNUNET_SET_accept (struct GNUNET_SET_Request *request, |
411 | struct GNUNET_SET_Handle *set, | 378 | struct GNUNET_SET_Handle *set, |
412 | struct GNUNET_TIME_Relative timeout, | ||
413 | enum GNUNET_SET_ResultMode result_mode, | 379 | enum GNUNET_SET_ResultMode result_mode, |
414 | GNUNET_SET_ResultIterator result_cb, | 380 | GNUNET_SET_ResultIterator result_cb, |
415 | void *result_cls) | 381 | void *result_cls) |
@@ -432,8 +398,6 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
432 | msg->accept_id = htonl (request->accept_id); | 398 | msg->accept_id = htonl (request->accept_id); |
433 | GNUNET_MQ_send (set->mq, mqm); | 399 | GNUNET_MQ_send (set->mq, mqm); |
434 | 400 | ||
435 | oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); | ||
436 | |||
437 | return oh; | 401 | return oh; |
438 | } | 402 | } |
439 | 403 | ||
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c index 0ab02cad7..66e7a81d1 100644 --- a/src/set/test_set_api.c +++ b/src/set/test_set_api.c | |||
@@ -29,26 +29,57 @@ | |||
29 | 29 | ||
30 | 30 | ||
31 | static struct GNUNET_PeerIdentity local_id; | 31 | static struct GNUNET_PeerIdentity local_id; |
32 | |||
32 | static struct GNUNET_HashCode app_id; | 33 | static struct GNUNET_HashCode app_id; |
33 | static struct GNUNET_SET_Handle *set1; | 34 | static struct GNUNET_SET_Handle *set1; |
34 | static struct GNUNET_SET_Handle *set2; | 35 | static struct GNUNET_SET_Handle *set2; |
35 | static struct GNUNET_SET_ListenHandle *listen_handle; | 36 | static struct GNUNET_SET_ListenHandle *listen_handle; |
36 | const static struct GNUNET_CONFIGURATION_Handle *config; | 37 | const static struct GNUNET_CONFIGURATION_Handle *config; |
37 | 38 | ||
39 | int num_done; | ||
40 | |||
38 | 41 | ||
39 | static void | 42 | static void |
40 | result_cb_set1 (void *cls, struct GNUNET_SET_Element *element, | 43 | result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element, |
41 | enum GNUNET_SET_Status status) | 44 | enum GNUNET_SET_Status status) |
42 | { | 45 | { |
43 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 1)\n"); | 46 | switch (status) |
47 | { | ||
48 | case GNUNET_SET_STATUS_OK: | ||
49 | printf ("set 1: got element\n"); | ||
50 | break; | ||
51 | case GNUNET_SET_STATUS_FAILURE: | ||
52 | printf ("set 1: failure\n"); | ||
53 | break; | ||
54 | case GNUNET_SET_STATUS_DONE: | ||
55 | printf ("set 1: done\n"); | ||
56 | GNUNET_SET_destroy (set1); | ||
57 | break; | ||
58 | default: | ||
59 | GNUNET_assert (0); | ||
60 | } | ||
44 | } | 61 | } |
45 | 62 | ||
46 | 63 | ||
47 | static void | 64 | static void |
48 | result_cb_set2 (void *cls, struct GNUNET_SET_Element *element, | 65 | result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element, |
49 | enum GNUNET_SET_Status status) | 66 | enum GNUNET_SET_Status status) |
50 | { | 67 | { |
51 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 2)\n"); | 68 | switch (status) |
69 | { | ||
70 | case GNUNET_SET_STATUS_OK: | ||
71 | printf ("set 2: got element\n"); | ||
72 | break; | ||
73 | case GNUNET_SET_STATUS_FAILURE: | ||
74 | printf ("set 2: failure\n"); | ||
75 | break; | ||
76 | case GNUNET_SET_STATUS_DONE: | ||
77 | printf ("set 2: done\n"); | ||
78 | GNUNET_SET_destroy (set2); | ||
79 | break; | ||
80 | default: | ||
81 | GNUNET_assert (0); | ||
82 | } | ||
52 | } | 83 | } |
53 | 84 | ||
54 | 85 | ||
@@ -59,7 +90,9 @@ listen_cb (void *cls, | |||
59 | struct GNUNET_SET_Request *request) | 90 | struct GNUNET_SET_Request *request) |
60 | { | 91 | { |
61 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); | 92 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); |
62 | GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL, | 93 | GNUNET_SET_listen_cancel (listen_handle); |
94 | |||
95 | GNUNET_SET_accept (request, set2, | ||
63 | GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); | 96 | GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); |
64 | } | 97 | } |
65 | 98 | ||
@@ -75,7 +108,7 @@ start (void *cls) | |||
75 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | 108 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, |
76 | &app_id, listen_cb, NULL); | 109 | &app_id, listen_cb, NULL); |
77 | GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, | 110 | GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, |
78 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED, | 111 | GNUNET_SET_RESULT_ADDED, |
79 | result_cb_set1, NULL); | 112 | result_cb_set1, NULL); |
80 | } | 113 | } |
81 | 114 | ||
@@ -120,7 +153,6 @@ init_set1 (void) | |||
120 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); | 153 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); |
121 | } | 154 | } |
122 | 155 | ||
123 | |||
124 | /** | 156 | /** |
125 | * Signature of the 'main' function for a (single-peer) testcase that | 157 | * Signature of the 'main' function for a (single-peer) testcase that |
126 | * is run using 'GNUNET_TESTING_peer_run'. | 158 | * is run using 'GNUNET_TESTING_peer_run'. |
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 8994afc24..b4a47b53d 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -579,6 +579,37 @@ struct GNUNET_STREAM_ShutdownHandle | |||
579 | 579 | ||
580 | 580 | ||
581 | /** | 581 | /** |
582 | * Collection of the state necessary to read and write gnunet messages | ||
583 | * to a stream socket. Should be used as closure for stream_data_processor. | ||
584 | */ | ||
585 | struct MQStreamState | ||
586 | { | ||
587 | /** | ||
588 | * Message stream tokenizer for the data received from the | ||
589 | * stream socket. | ||
590 | */ | ||
591 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
592 | |||
593 | /** | ||
594 | * The stream socket to use for receiving and transmitting | ||
595 | * messages with the message queue. | ||
596 | */ | ||
597 | struct GNUNET_STREAM_Socket *socket; | ||
598 | |||
599 | /** | ||
600 | * Current read handle, NULL if no read active. | ||
601 | */ | ||
602 | struct GNUNET_STREAM_ReadHandle *rh; | ||
603 | |||
604 | /** | ||
605 | * Current write handle, NULL if no write active. | ||
606 | */ | ||
607 | struct GNUNET_STREAM_WriteHandle *wh; | ||
608 | }; | ||
609 | |||
610 | |||
611 | |||
612 | /** | ||
582 | * Default value in seconds for various timeouts | 613 | * Default value in seconds for various timeouts |
583 | */ | 614 | */ |
584 | static const unsigned int default_timeout = 10; | 615 | static const unsigned int default_timeout = 10; |
@@ -3731,4 +3762,186 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh) | |||
3731 | cleanup_read_handle (socket); | 3762 | cleanup_read_handle (socket); |
3732 | } | 3763 | } |
3733 | 3764 | ||
3765 | |||
3766 | /** | ||
3767 | * Functions of this signature are called whenever writing operations | ||
3768 | * on a stream are executed | ||
3769 | * | ||
3770 | * @param cls the closure from GNUNET_STREAM_write | ||
3771 | * @param status the status of the stream at the time this function is called; | ||
3772 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
3773 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
3774 | * (this doesn't mean that the data is never sent, the receiver may | ||
3775 | * have read the data but its ACKs may have been lost); | ||
3776 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
3777 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
3778 | * be processed. | ||
3779 | * @param size the number of bytes written | ||
3780 | */ | ||
3781 | static void | ||
3782 | mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
3783 | { | ||
3784 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
3785 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | ||
3786 | struct GNUNET_MQ_Message *mqm; | ||
3787 | |||
3788 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
3789 | |||
3790 | /* call cb for message we finished sending */ | ||
3791 | mqm = mq->current_msg; | ||
3792 | GNUNET_assert (NULL != mq->current_msg); | ||
3793 | if (NULL != mqm->sent_cb) | ||
3794 | mqm->sent_cb (mqm->sent_cls); | ||
3795 | GNUNET_free (mqm); | ||
3796 | |||
3797 | mss->wh = NULL; | ||
3798 | |||
3799 | mqm = mq->msg_head; | ||
3800 | mq->current_msg = mqm; | ||
3801 | if (NULL == mqm) | ||
3802 | return; | ||
3803 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); | ||
3804 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
3805 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3806 | mq_stream_write_queued, mq); | ||
3807 | GNUNET_assert (NULL != mss->wh); | ||
3808 | } | ||
3809 | |||
3810 | |||
3811 | static void | ||
3812 | mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, | ||
3813 | struct GNUNET_MQ_Message *mqm) | ||
3814 | { | ||
3815 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | ||
3816 | |||
3817 | if (NULL != mq->current_msg) | ||
3818 | { | ||
3819 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
3820 | return; | ||
3821 | } | ||
3822 | mq->current_msg = mqm; | ||
3823 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
3824 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3825 | mq_stream_write_queued, mq); | ||
3826 | } | ||
3827 | |||
3828 | |||
3829 | /** | ||
3830 | * Functions with this signature are called whenever a | ||
3831 | * complete message is received by the tokenizer. | ||
3832 | * | ||
3833 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
3834 | * | ||
3835 | * @param cls closure | ||
3836 | * @param client identification of the client | ||
3837 | * @param message the actual message | ||
3838 | * | ||
3839 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
3840 | */ | ||
3841 | static int | ||
3842 | mq_stream_mst_callback (void *cls, void *client, | ||
3843 | const struct GNUNET_MessageHeader *message) | ||
3844 | { | ||
3845 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
3846 | |||
3847 | GNUNET_assert (NULL != message); | ||
3848 | GNUNET_MQ_dispatch (mq, message); | ||
3849 | return GNUNET_OK; | ||
3850 | } | ||
3851 | |||
3852 | |||
3853 | /** | ||
3854 | * Functions of this signature are called whenever data is available from the | ||
3855 | * stream. | ||
3856 | * | ||
3857 | * @param cls the closure from GNUNET_STREAM_read | ||
3858 | * @param status the status of the stream at the time this function is called | ||
3859 | * @param data traffic from the other side | ||
3860 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
3861 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
3862 | * given to the next time the read processor is called). | ||
3863 | */ | ||
3864 | static size_t | ||
3865 | mq_stream_data_processor (void *cls, | ||
3866 | enum GNUNET_STREAM_Status status, | ||
3867 | const void *data, | ||
3868 | size_t size) | ||
3869 | { | ||
3870 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
3871 | struct MQStreamState *mss; | ||
3872 | int ret; | ||
3873 | |||
3874 | mss = (struct MQStreamState *) mq->impl_state; | ||
3875 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
3876 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | ||
3877 | GNUNET_assert (GNUNET_OK == ret); | ||
3878 | /* we always read all data */ | ||
3879 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
3880 | mq_stream_data_processor, mq); | ||
3881 | return size; | ||
3882 | } | ||
3883 | |||
3884 | |||
3885 | static void | ||
3886 | mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | ||
3887 | { | ||
3888 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | ||
3889 | |||
3890 | if (NULL != mss->rh) | ||
3891 | { | ||
3892 | GNUNET_STREAM_read_cancel (mss->rh); | ||
3893 | mss->rh = NULL; | ||
3894 | } | ||
3895 | |||
3896 | if (NULL != mss->wh) | ||
3897 | { | ||
3898 | GNUNET_STREAM_write_cancel (mss->wh); | ||
3899 | mss->wh = NULL; | ||
3900 | } | ||
3901 | |||
3902 | if (NULL != mss->mst) | ||
3903 | { | ||
3904 | GNUNET_SERVER_mst_destroy (mss->mst); | ||
3905 | mss->mst = NULL; | ||
3906 | } | ||
3907 | |||
3908 | GNUNET_free (mss); | ||
3909 | } | ||
3910 | |||
3911 | |||
3912 | |||
3913 | /** | ||
3914 | * Create a message queue for a stream socket. | ||
3915 | * | ||
3916 | * @param socket the socket to read/write in the message queue | ||
3917 | * @param msg_handlers message handler array | ||
3918 | * @param error_handler callback for errors | ||
3919 | * @return the message queue for the socket | ||
3920 | */ | ||
3921 | struct GNUNET_MQ_MessageQueue * | ||
3922 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, | ||
3923 | const struct GNUNET_MQ_Handler *msg_handlers, | ||
3924 | GNUNET_MQ_ErrorHandler error_handler, | ||
3925 | void *cls) | ||
3926 | { | ||
3927 | struct GNUNET_MQ_MessageQueue *mq; | ||
3928 | struct MQStreamState *mss; | ||
3929 | |||
3930 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
3931 | mss = GNUNET_new (struct MQStreamState); | ||
3932 | mss->socket = socket; | ||
3933 | mq->impl_state = mss; | ||
3934 | mq->send_impl = mq_stream_send_impl; | ||
3935 | mq->destroy_impl = mq_stream_destroy_impl; | ||
3936 | mq->handlers = msg_handlers; | ||
3937 | mq->handlers_cls = cls; | ||
3938 | if (NULL != msg_handlers) | ||
3939 | { | ||
3940 | mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); | ||
3941 | mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
3942 | mq_stream_data_processor, mq); | ||
3943 | } | ||
3944 | return mq; | ||
3945 | } | ||
3946 | |||
3734 | /* end of stream_api.c */ | 3947 | /* end of stream_api.c */ |
diff --git a/src/util/Makefile.am b/src/util/Makefile.am index ac855c25e..491006a42 100644 --- a/src/util/Makefile.am +++ b/src/util/Makefile.am | |||
@@ -90,6 +90,7 @@ libgnunetutil_la_SOURCES = \ | |||
90 | getopt_helpers.c \ | 90 | getopt_helpers.c \ |
91 | helper.c \ | 91 | helper.c \ |
92 | load.c \ | 92 | load.c \ |
93 | mq.c \ | ||
93 | network.c \ | 94 | network.c \ |
94 | os_installation.c \ | 95 | os_installation.c \ |
95 | os_network.c \ | 96 | os_network.c \ |
@@ -230,6 +231,8 @@ check_PROGRAMS = \ | |||
230 | test_connection_timeout \ | 231 | test_connection_timeout \ |
231 | test_connection_timeout_no_connect \ | 232 | test_connection_timeout_no_connect \ |
232 | test_connection_transmit_cancel \ | 233 | test_connection_transmit_cancel \ |
234 | test_mq \ | ||
235 | test_mq_client \ | ||
233 | test_os_network \ | 236 | test_os_network \ |
234 | test_os_priority \ | 237 | test_os_priority \ |
235 | test_peer \ | 238 | test_peer \ |
@@ -416,6 +419,16 @@ test_connection_transmit_cancel_SOURCES = \ | |||
416 | test_connection_transmit_cancel_LDADD = \ | 419 | test_connection_transmit_cancel_LDADD = \ |
417 | $(top_builddir)/src/util/libgnunetutil.la | 420 | $(top_builddir)/src/util/libgnunetutil.la |
418 | 421 | ||
422 | test_mq_SOURCES = \ | ||
423 | test_mq.c | ||
424 | test_mq_LDADD = \ | ||
425 | $(top_builddir)/src/util/libgnunetutil.la | ||
426 | |||
427 | test_mq_client_SOURCES = \ | ||
428 | test_mq_client.c | ||
429 | test_mq_client_LDADD = \ | ||
430 | $(top_builddir)/src/util/libgnunetutil.la | ||
431 | |||
419 | test_os_network_SOURCES = \ | 432 | test_os_network_SOURCES = \ |
420 | test_os_network.c | 433 | test_os_network.c |
421 | test_os_network_LDADD = \ | 434 | test_os_network_LDADD = \ |
diff --git a/src/set/mq.c b/src/util/mq.c index 0ced014dd..36cacd30b 100644 --- a/src/set/mq.c +++ b/src/util/mq.c | |||
@@ -20,40 +20,16 @@ | |||
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @author Florian Dold | 22 | * @author Florian Dold |
23 | * @file set/mq.c | 23 | * @file util/mq.c |
24 | * @brief general purpose request queue | 24 | * @brief general purpose request queue |
25 | */ | 25 | */ |
26 | 26 | ||
27 | #include "mq.h" | 27 | #include "platform.h" |
28 | 28 | #include "gnunet_common.h" | |
29 | #include "gnunet_util_lib.h" | ||
29 | 30 | ||
30 | #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) | 31 | #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) |
31 | 32 | ||
32 | /** | ||
33 | * Signature of functions implementing the | ||
34 | * sending part of a message queue | ||
35 | * | ||
36 | * @param q the message queue | ||
37 | * @param m the message | ||
38 | */ | ||
39 | typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m); | ||
40 | |||
41 | |||
42 | typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q); | ||
43 | |||
44 | |||
45 | /** | ||
46 | * Collection of the state necessary to read and write gnunet messages | ||
47 | * to a stream socket. Should be used as closure for stream_data_processor. | ||
48 | */ | ||
49 | struct MessageStreamState | ||
50 | { | ||
51 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
52 | struct MessageQueue *mq; | ||
53 | struct GNUNET_STREAM_Socket *socket; | ||
54 | struct GNUNET_STREAM_ReadHandle *rh; | ||
55 | struct GNUNET_STREAM_WriteHandle *wh; | ||
56 | }; | ||
57 | 33 | ||
58 | 34 | ||
59 | struct ServerClientSocketState | 35 | struct ServerClientSocketState |
@@ -65,131 +41,25 @@ struct ServerClientSocketState | |||
65 | 41 | ||
66 | struct ClientConnectionState | 42 | struct ClientConnectionState |
67 | { | 43 | { |
68 | struct GNUNET_CLIENT_Connection *connection; | ||
69 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
70 | }; | ||
71 | |||
72 | |||
73 | struct GNUNET_MQ_MessageQueue | ||
74 | { | ||
75 | /** | ||
76 | * Handlers array, or NULL if the queue should not receive messages | ||
77 | */ | ||
78 | const struct GNUNET_MQ_Handler *handlers; | ||
79 | |||
80 | /** | ||
81 | * Closure for the handler callbacks | ||
82 | */ | ||
83 | void *handlers_cls; | ||
84 | |||
85 | /** | ||
86 | * Actual implementation of message sending, | ||
87 | * called when a message is added | ||
88 | */ | ||
89 | SendImpl send_impl; | ||
90 | |||
91 | /** | ||
92 | * Implementation-dependent queue destruction function | ||
93 | */ | ||
94 | DestroyImpl destroy_impl; | ||
95 | |||
96 | /** | 44 | /** |
97 | * Implementation-specific state | 45 | * Did we call receive? |
98 | */ | 46 | */ |
99 | void *impl_state; | 47 | int receive_active; |
100 | 48 | struct GNUNET_CLIENT_Connection *connection; | |
101 | /** | 49 | struct GNUNET_CLIENT_TransmitHandle *th; |
102 | * Callback will be called when the message queue is empty | ||
103 | */ | ||
104 | GNUNET_MQ_NotifyCallback empty_cb; | ||
105 | |||
106 | /** | ||
107 | * Closure for empty_cb | ||
108 | */ | ||
109 | void *empty_cls; | ||
110 | |||
111 | /** | ||
112 | * Callback will be called when a read error occurs. | ||
113 | */ | ||
114 | GNUNET_MQ_NotifyCallback read_error_cb; | ||
115 | |||
116 | /** | ||
117 | * Closure for read_error_cb | ||
118 | */ | ||
119 | void *read_error_cls; | ||
120 | |||
121 | /** | ||
122 | * Linked list of messages pending to be sent | ||
123 | */ | ||
124 | struct GNUNET_MQ_Message *msg_head; | ||
125 | |||
126 | /** | ||
127 | * Linked list of messages pending to be sent | ||
128 | */ | ||
129 | struct GNUNET_MQ_Message *msg_tail; | ||
130 | |||
131 | /** | ||
132 | * Message that is currently scheduled to be | ||
133 | * sent. Not the head of the message queue, as the implementation | ||
134 | * needs to know if sending has been already scheduled or not. | ||
135 | */ | ||
136 | struct GNUNET_MQ_Message *current_msg; | ||
137 | |||
138 | /** | ||
139 | * Map of associations, lazily allocated | ||
140 | */ | ||
141 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | ||
142 | |||
143 | /** | ||
144 | * Next id that should be used for the assoc_map, | ||
145 | * initialized lazily to a random value together with | ||
146 | * assoc_map | ||
147 | */ | ||
148 | uint32_t assoc_id; | ||
149 | }; | 50 | }; |
150 | 51 | ||
151 | 52 | ||
152 | struct GNUNET_MQ_Message | ||
153 | { | ||
154 | /** | ||
155 | * Messages are stored in a linked list | ||
156 | */ | ||
157 | struct GNUNET_MQ_Message *next; | ||
158 | |||
159 | /** | ||
160 | * Messages are stored in a linked list | ||
161 | */ | ||
162 | struct GNUNET_MQ_Message *prev; | ||
163 | |||
164 | /** | ||
165 | * Actual allocated message header, | ||
166 | * usually points to the end of the containing GNUNET_MQ_Message | ||
167 | */ | ||
168 | struct GNUNET_MessageHeader *mh; | ||
169 | |||
170 | /** | ||
171 | * Queue the message is queued in, NULL if message is not queued. | ||
172 | */ | ||
173 | struct GNUNET_MQ_MessageQueue *parent_queue; | ||
174 | |||
175 | /** | ||
176 | * Called after the message was sent irrevokably | ||
177 | */ | ||
178 | GNUNET_MQ_NotifyCallback sent_cb; | ||
179 | |||
180 | /** | ||
181 | * Closure for send_cb | ||
182 | */ | ||
183 | void *sent_cls; | ||
184 | }; | ||
185 | 53 | ||
186 | 54 | ||
187 | /** | 55 | /** |
188 | * Call the right callback for a message received | 56 | * Call the right callback for a message. |
189 | * by a queue | 57 | * |
58 | * @param mq message queue with the handlers | ||
59 | * @param mh message to dispatch | ||
190 | */ | 60 | */ |
191 | static void | 61 | void |
192 | dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) | 62 | GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) |
193 | { | 63 | { |
194 | const struct GNUNET_MQ_Handler *handler; | 64 | const struct GNUNET_MQ_Handler *handler; |
195 | int handled = GNUNET_NO; | 65 | int handled = GNUNET_NO; |
@@ -273,181 +143,6 @@ GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, | |||
273 | } | 143 | } |
274 | 144 | ||
275 | 145 | ||
276 | /** | ||
277 | * Functions of this signature are called whenever writing operations | ||
278 | * on a stream are executed | ||
279 | * | ||
280 | * @param cls the closure from GNUNET_STREAM_write | ||
281 | * @param status the status of the stream at the time this function is called; | ||
282 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
283 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
284 | * (this doesn't mean that the data is never sent, the receiver may | ||
285 | * have read the data but its ACKs may have been lost); | ||
286 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
287 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
288 | * be processed. | ||
289 | * @param size the number of bytes written | ||
290 | */ | ||
291 | static void | ||
292 | stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
293 | { | ||
294 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
295 | struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state; | ||
296 | struct GNUNET_MQ_Message *mqm; | ||
297 | |||
298 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
299 | |||
300 | /* call cb for message we finished sending */ | ||
301 | mqm = mq->current_msg; | ||
302 | GNUNET_assert (NULL != mq->current_msg); | ||
303 | if (NULL != mqm->sent_cb) | ||
304 | mqm->sent_cb (mqm->sent_cls); | ||
305 | GNUNET_free (mqm); | ||
306 | |||
307 | mss->wh = NULL; | ||
308 | |||
309 | mqm = mq->msg_head; | ||
310 | mq->current_msg = mqm; | ||
311 | if (NULL == mqm) | ||
312 | { | ||
313 | if (NULL != mq->empty_cb) | ||
314 | mq->empty_cb (mq->empty_cls); | ||
315 | return; | ||
316 | } | ||
317 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); | ||
318 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
319 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
320 | stream_write_queued, mq); | ||
321 | GNUNET_assert (NULL != mss->wh); | ||
322 | } | ||
323 | |||
324 | |||
325 | static void | ||
326 | stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, | ||
327 | struct GNUNET_MQ_Message *mqm) | ||
328 | { | ||
329 | struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state; | ||
330 | if (NULL != mq->current_msg) | ||
331 | { | ||
332 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
333 | return; | ||
334 | } | ||
335 | mq->current_msg = mqm; | ||
336 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
337 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
338 | stream_write_queued, mq); | ||
339 | } | ||
340 | |||
341 | |||
342 | /** | ||
343 | * Functions with this signature are called whenever a | ||
344 | * complete message is received by the tokenizer. | ||
345 | * | ||
346 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
347 | * | ||
348 | * @param cls closure | ||
349 | * @param client identification of the client | ||
350 | * @param message the actual message | ||
351 | * | ||
352 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
353 | */ | ||
354 | static int | ||
355 | stream_mst_callback (void *cls, void *client, | ||
356 | const struct GNUNET_MessageHeader *message) | ||
357 | { | ||
358 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
359 | |||
360 | GNUNET_assert (NULL != message); | ||
361 | dispatch_message (mq, message); | ||
362 | return GNUNET_OK; | ||
363 | } | ||
364 | |||
365 | |||
366 | /** | ||
367 | * Functions of this signature are called whenever data is available from the | ||
368 | * stream. | ||
369 | * | ||
370 | * @param cls the closure from GNUNET_STREAM_read | ||
371 | * @param status the status of the stream at the time this function is called | ||
372 | * @param data traffic from the other side | ||
373 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
374 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
375 | * given to the next time the read processor is called). | ||
376 | */ | ||
377 | static size_t | ||
378 | stream_data_processor (void *cls, | ||
379 | enum GNUNET_STREAM_Status status, | ||
380 | const void *data, | ||
381 | size_t size) | ||
382 | { | ||
383 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
384 | struct MessageStreamState *mss; | ||
385 | int ret; | ||
386 | |||
387 | mss = (struct MessageStreamState *) mq->impl_state; | ||
388 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
389 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | ||
390 | GNUNET_assert (GNUNET_OK == ret); | ||
391 | /* we always read all data */ | ||
392 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
393 | stream_data_processor, mq); | ||
394 | return size; | ||
395 | } | ||
396 | |||
397 | |||
398 | static void | ||
399 | stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | ||
400 | { | ||
401 | struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state; | ||
402 | |||
403 | if (NULL != mss->rh) | ||
404 | { | ||
405 | GNUNET_STREAM_read_cancel (mss->rh); | ||
406 | mss->rh = NULL; | ||
407 | } | ||
408 | |||
409 | if (NULL != mss->wh) | ||
410 | { | ||
411 | GNUNET_STREAM_write_cancel (mss->wh); | ||
412 | mss->wh = NULL; | ||
413 | } | ||
414 | |||
415 | if (NULL != mss->mst) | ||
416 | { | ||
417 | GNUNET_SERVER_mst_destroy (mss->mst); | ||
418 | mss->mst = NULL; | ||
419 | } | ||
420 | |||
421 | GNUNET_free (mss); | ||
422 | } | ||
423 | |||
424 | |||
425 | |||
426 | |||
427 | struct GNUNET_MQ_MessageQueue * | ||
428 | GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, | ||
429 | const struct GNUNET_MQ_Handler *handlers, | ||
430 | void *cls) | ||
431 | { | ||
432 | struct GNUNET_MQ_MessageQueue *mq; | ||
433 | struct MessageStreamState *mss; | ||
434 | |||
435 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
436 | mss = GNUNET_new (struct MessageStreamState); | ||
437 | mss->socket = socket; | ||
438 | mq->impl_state = mss; | ||
439 | mq->send_impl = stream_socket_send_impl; | ||
440 | mq->destroy_impl = &stream_socket_destroy_impl; | ||
441 | mq->handlers = handlers; | ||
442 | mq->handlers_cls = cls; | ||
443 | if (NULL != handlers) | ||
444 | { | ||
445 | mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq); | ||
446 | mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
447 | stream_data_processor, mq); | ||
448 | } | ||
449 | return mq; | ||
450 | } | ||
451 | 146 | ||
452 | 147 | ||
453 | /*** Transmit a queued message to the session's client. | 148 | /*** Transmit a queued message to the session's client. |
@@ -490,8 +185,6 @@ transmit_queued (void *cls, size_t size, | |||
490 | GNUNET_TIME_UNIT_FOREVER_REL, | 185 | GNUNET_TIME_UNIT_FOREVER_REL, |
491 | &transmit_queued, mq); | 186 | &transmit_queued, mq); |
492 | } | 187 | } |
493 | else if (NULL != mq->empty_cb) | ||
494 | mq->empty_cb (mq->empty_cls); | ||
495 | return msg_size; | 188 | return msg_size; |
496 | } | 189 | } |
497 | 190 | ||
@@ -553,6 +246,37 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) | |||
553 | 246 | ||
554 | 247 | ||
555 | /** | 248 | /** |
249 | * Type of a function to call when we receive a message | ||
250 | * from the service. | ||
251 | * | ||
252 | * @param cls closure | ||
253 | * @param msg message received, NULL on timeout or fatal error | ||
254 | */ | ||
255 | static void | ||
256 | handle_client_message (void *cls, | ||
257 | const struct GNUNET_MessageHeader *msg) | ||
258 | { | ||
259 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
260 | struct ClientConnectionState *state; | ||
261 | |||
262 | state = mq->impl_state; | ||
263 | |||
264 | if (NULL == msg) | ||
265 | { | ||
266 | if (NULL == mq->error_handler) | ||
267 | LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); | ||
268 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
269 | return; | ||
270 | } | ||
271 | |||
272 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | ||
273 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
274 | |||
275 | GNUNET_MQ_dispatch (mq, msg); | ||
276 | } | ||
277 | |||
278 | |||
279 | /** | ||
556 | * Transmit a queued message to the session's client. | 280 | * Transmit a queued message to the session's client. |
557 | * | 281 | * |
558 | * @param cls consensus session | 282 | * @param cls consensus session |
@@ -569,6 +293,24 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
569 | struct ClientConnectionState *state = mq->impl_state; | 293 | struct ClientConnectionState *state = mq->impl_state; |
570 | size_t msg_size; | 294 | size_t msg_size; |
571 | 295 | ||
296 | if (NULL == buf) | ||
297 | { | ||
298 | if (NULL == mq->error_handler) | ||
299 | { | ||
300 | LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n"); | ||
301 | return 0; | ||
302 | } | ||
303 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
304 | return 0; | ||
305 | } | ||
306 | |||
307 | if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active)) | ||
308 | { | ||
309 | state->receive_active = GNUNET_YES; | ||
310 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | ||
311 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
312 | } | ||
313 | |||
572 | 314 | ||
573 | GNUNET_assert (NULL != mqm); | 315 | GNUNET_assert (NULL != mqm); |
574 | 316 | ||
@@ -593,8 +335,6 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
593 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | 335 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, |
594 | &connection_client_transmit_queued, mq); | 336 | &connection_client_transmit_queued, mq); |
595 | } | 337 | } |
596 | else if (NULL != mq->empty_cb) | ||
597 | mq->empty_cb (mq->empty_cls); | ||
598 | return msg_size; | 338 | return msg_size; |
599 | } | 339 | } |
600 | 340 | ||
@@ -631,35 +371,6 @@ connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, | |||
631 | 371 | ||
632 | 372 | ||
633 | 373 | ||
634 | /** | ||
635 | * Type of a function to call when we receive a message | ||
636 | * from the service. | ||
637 | * | ||
638 | * @param cls closure | ||
639 | * @param msg message received, NULL on timeout or fatal error | ||
640 | */ | ||
641 | static void | ||
642 | handle_client_message (void *cls, | ||
643 | const struct GNUNET_MessageHeader *msg) | ||
644 | { | ||
645 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
646 | struct ClientConnectionState *state; | ||
647 | |||
648 | state = mq->impl_state; | ||
649 | |||
650 | if (NULL == msg) | ||
651 | { | ||
652 | if (NULL == mq->read_error_cb) | ||
653 | LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); | ||
654 | mq->read_error_cb (mq->read_error_cls); | ||
655 | return; | ||
656 | } | ||
657 | |||
658 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | ||
659 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
660 | |||
661 | dispatch_message (mq, msg); | ||
662 | } | ||
663 | 374 | ||
664 | 375 | ||
665 | struct GNUNET_MQ_MessageQueue * | 376 | struct GNUNET_MQ_MessageQueue * |
@@ -681,12 +392,6 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti | |||
681 | mq->send_impl = connection_client_send_impl; | 392 | mq->send_impl = connection_client_send_impl; |
682 | mq->destroy_impl = connection_client_destroy_impl; | 393 | mq->destroy_impl = connection_client_destroy_impl; |
683 | 394 | ||
684 | if (NULL != handlers) | ||
685 | { | ||
686 | GNUNET_CLIENT_receive (connection, handle_client_message, mq, | ||
687 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
688 | } | ||
689 | |||
690 | return mq; | 395 | return mq; |
691 | } | 396 | } |
692 | 397 | ||
@@ -774,20 +479,3 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | |||
774 | GNUNET_free (mq); | 479 | GNUNET_free (mq); |
775 | } | 480 | } |
776 | 481 | ||
777 | |||
778 | /** | ||
779 | * Call a callback once all messages queued have been sent, | ||
780 | * i.e. the message queue is empty. | ||
781 | * | ||
782 | * @param mqm the message queue to send the notification for | ||
783 | * @param cb the callback to call on an empty queue | ||
784 | * @param cls closure for cb | ||
785 | */ | ||
786 | void | ||
787 | GNUNET_MQ_notify_empty (struct GNUNET_MQ_MessageQueue *mqm, | ||
788 | GNUNET_MQ_NotifyCallback cb, | ||
789 | void *cls) | ||
790 | { | ||
791 | mqm->empty_cb = cb; | ||
792 | mqm->empty_cls = cls; | ||
793 | } | ||
diff --git a/src/set/test_mq.c b/src/util/test_mq.c index d13c63440..161b40a20 100644 --- a/src/set/test_mq.c +++ b/src/util/test_mq.c | |||
@@ -25,7 +25,6 @@ | |||
25 | #include "platform.h" | 25 | #include "platform.h" |
26 | #include "gnunet_util_lib.h" | 26 | #include "gnunet_util_lib.h" |
27 | #include "gnunet_testing_lib.h" | 27 | #include "gnunet_testing_lib.h" |
28 | #include "mq.h" | ||
29 | 28 | ||
30 | 29 | ||
31 | GNUNET_NETWORK_STRUCT_BEGIN | 30 | GNUNET_NETWORK_STRUCT_BEGIN |
diff --git a/src/set/test_mq_client.c b/src/util/test_mq_client.c index ca615d37e..b7eb1516a 100644 --- a/src/set/test_mq_client.c +++ b/src/util/test_mq_client.c | |||
@@ -19,22 +19,12 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file set/test_mq.c | 22 | * @file util/test_mq_client.c |
23 | * @brief tests for mq with connection client | 23 | * @brief tests for mq with connection client |
24 | */ | 24 | */ |
25 | /** | ||
26 | * @file util/test_server_with_client.c | ||
27 | * @brief tests for server.c and client.c, | ||
28 | * specifically disconnect_notify, | ||
29 | * client_get_address and receive_done (resume processing) | ||
30 | */ | ||
31 | #include "platform.h" | 25 | #include "platform.h" |
32 | #include "gnunet_common.h" | 26 | #include "gnunet_common.h" |
33 | #include "gnunet_scheduler_lib.h" | 27 | #include "gnunet_util_lib.h" |
34 | #include "gnunet_client_lib.h" | ||
35 | #include "gnunet_server_lib.h" | ||
36 | #include "gnunet_time_lib.h" | ||
37 | #include "mq.h" | ||
38 | 28 | ||
39 | #define PORT 23336 | 29 | #define PORT 23336 |
40 | 30 | ||