diff options
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 1945 |
1 files changed, 174 insertions, 1771 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index a7640c51f..44edeb215 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -29,14 +29,10 @@ | |||
29 | #include "gnunet_protocols.h" | 29 | #include "gnunet_protocols.h" |
30 | #include "gnunet_applications.h" | 30 | #include "gnunet_applications.h" |
31 | #include "gnunet_util_lib.h" | 31 | #include "gnunet_util_lib.h" |
32 | #include "gnunet_set_service.h" | ||
32 | #include "gnunet_consensus_service.h" | 33 | #include "gnunet_consensus_service.h" |
33 | #include "gnunet_core_service.h" | ||
34 | #include "gnunet_stream_lib.h" | ||
35 | |||
36 | #include "consensus_protocol.h" | 34 | #include "consensus_protocol.h" |
37 | #include "consensus.h" | 35 | #include "consensus.h" |
38 | #include "ibf.h" | ||
39 | #include "strata_estimator.h" | ||
40 | 36 | ||
41 | 37 | ||
42 | /* | 38 | /* |
@@ -47,82 +43,19 @@ | |||
47 | 43 | ||
48 | 44 | ||
49 | /** | 45 | /** |
50 | * Number of IBFs in a strata estimator. | ||
51 | */ | ||
52 | #define SE_STRATA_COUNT 32 | ||
53 | /** | ||
54 | * Size of the IBFs in the strata estimator. | ||
55 | */ | ||
56 | #define SE_IBF_SIZE 80 | ||
57 | /** | ||
58 | * hash num parameter for the difference digests and strata estimators | ||
59 | */ | ||
60 | #define SE_IBF_HASH_NUM 3 | ||
61 | |||
62 | /** | ||
63 | * Number of buckets that can be transmitted in one message. | ||
64 | */ | ||
65 | #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) | ||
66 | |||
67 | /** | ||
68 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | ||
69 | * Choose this value so that computing the IBF is still cheaper | ||
70 | * than transmitting all values. | ||
71 | */ | ||
72 | #define MAX_IBF_ORDER (16) | ||
73 | |||
74 | /** | ||
75 | * Number of exponential rounds, used in the inventory and completion round. | 46 | * Number of exponential rounds, used in the inventory and completion round. |
76 | */ | 47 | */ |
77 | #define NUM_EXP_ROUNDS (4) | 48 | #define NUM_EXP_ROUNDS (4) |
78 | 49 | ||
79 | |||
80 | /* forward declarations */ | 50 | /* forward declarations */ |
81 | 51 | ||
82 | /* mutual recursion with struct ConsensusSession */ | 52 | /* mutual recursion with struct ConsensusSession */ |
83 | struct ConsensusPeerInformation; | 53 | struct ConsensusPeerInformation; |
84 | 54 | ||
85 | struct MessageQueue; | ||
86 | |||
87 | /* mutual recursion with round_over */ | 55 | /* mutual recursion with round_over */ |
88 | static void | 56 | static void |
89 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 57 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); |
90 | 58 | ||
91 | /* mutial recursion with transmit_queued */ | ||
92 | static void | ||
93 | client_send_next (struct MessageQueue *mq); | ||
94 | |||
95 | /* mutual recursion with mst_session_callback */ | ||
96 | static void | ||
97 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket); | ||
98 | |||
99 | static int | ||
100 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message); | ||
101 | |||
102 | |||
103 | /** | ||
104 | * Additional information about a consensus element. | ||
105 | */ | ||
106 | struct ElementInfo | ||
107 | { | ||
108 | /** | ||
109 | * The element itself. | ||
110 | */ | ||
111 | struct GNUNET_CONSENSUS_Element *element; | ||
112 | /** | ||
113 | * Hash of the element | ||
114 | */ | ||
115 | struct GNUNET_HashCode *element_hash; | ||
116 | /** | ||
117 | * Number of other peers that have the element in the inventory. | ||
118 | */ | ||
119 | unsigned int inventory_count; | ||
120 | /** | ||
121 | * Bitmap of peers that have this element in their inventory | ||
122 | */ | ||
123 | uint8_t *inventory_bitmap; | ||
124 | }; | ||
125 | |||
126 | 59 | ||
127 | /** | 60 | /** |
128 | * Describes the current round a consensus session is in. | 61 | * Describes the current round a consensus session is in. |
@@ -138,7 +71,8 @@ enum ConsensusRound | |||
138 | */ | 71 | */ |
139 | CONSENSUS_ROUND_EXCHANGE, | 72 | CONSENSUS_ROUND_EXCHANGE, |
140 | /** | 73 | /** |
141 | * Exchange which elements each peer has, but not the elements. | 74 | * Exchange which elements each peer has, but don't |
75 | * transmit the element's data, only their SHA-512 hashes. | ||
142 | * This round uses the all-to-all scheme. | 76 | * This round uses the all-to-all scheme. |
143 | */ | 77 | */ |
144 | CONSENSUS_ROUND_INVENTORY, | 78 | CONSENSUS_ROUND_INVENTORY, |
@@ -153,82 +87,6 @@ enum ConsensusRound | |||
153 | CONSENSUS_ROUND_FINISH | 87 | CONSENSUS_ROUND_FINISH |
154 | }; | 88 | }; |
155 | 89 | ||
156 | /* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */ | ||
157 | |||
158 | /** | ||
159 | * State of another peer with respect to the | ||
160 | * current ibf. | ||
161 | */ | ||
162 | enum ConsensusIBFState { | ||
163 | /** | ||
164 | * There is nothing going on with the IBF. | ||
165 | */ | ||
166 | IBF_STATE_NONE=0, | ||
167 | /** | ||
168 | * We currently receive an ibf. | ||
169 | */ | ||
170 | IBF_STATE_RECEIVING, | ||
171 | /* | ||
172 | * we decode a received ibf | ||
173 | */ | ||
174 | IBF_STATE_DECODING, | ||
175 | /** | ||
176 | * wait for elements and element requests | ||
177 | */ | ||
178 | IBF_STATE_ANTICIPATE_DIFF | ||
179 | }; | ||
180 | |||
181 | |||
182 | typedef void (*AddCallback) (struct MessageQueue *mq); | ||
183 | typedef void (*MessageSentCallback) (void *cls); | ||
184 | |||
185 | |||
186 | /** | ||
187 | * Collection of the state necessary to read and write gnunet messages | ||
188 | * to a stream socket. Should be used as closure for stream_data_processor. | ||
189 | */ | ||
190 | struct MessageStreamState | ||
191 | { | ||
192 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
193 | struct MessageQueue *mq; | ||
194 | void *mst_cls; | ||
195 | struct GNUNET_STREAM_Socket *socket; | ||
196 | struct GNUNET_STREAM_ReadHandle *rh; | ||
197 | struct GNUNET_STREAM_WriteHandle *wh; | ||
198 | }; | ||
199 | |||
200 | |||
201 | struct ServerClientSocketState | ||
202 | { | ||
203 | struct GNUNET_SERVER_Client *client; | ||
204 | struct GNUNET_SERVER_TransmitHandle* th; | ||
205 | }; | ||
206 | |||
207 | |||
208 | /** | ||
209 | * Generic message queue, for queueing outgoing messages. | ||
210 | */ | ||
211 | struct MessageQueue | ||
212 | { | ||
213 | void *state; | ||
214 | AddCallback add_cb; | ||
215 | struct PendingMessage *pending_head; | ||
216 | struct PendingMessage *pending_tail; | ||
217 | struct PendingMessage *current_pm; | ||
218 | }; | ||
219 | |||
220 | |||
221 | struct PendingMessage | ||
222 | { | ||
223 | struct GNUNET_MessageHeader *msg; | ||
224 | struct MessageQueue *parent_queue; | ||
225 | struct PendingMessage *next; | ||
226 | struct PendingMessage *prev; | ||
227 | MessageSentCallback sent_cb; | ||
228 | void *sent_cb_cls; | ||
229 | }; | ||
230 | |||
231 | |||
232 | /** | 90 | /** |
233 | * A consensus session consists of one local client and the remote authorities. | 91 | * A consensus session consists of one local client and the remote authorities. |
234 | */ | 92 | */ |
@@ -245,58 +103,35 @@ struct ConsensusSession | |||
245 | struct ConsensusSession *prev; | 103 | struct ConsensusSession *prev; |
246 | 104 | ||
247 | /** | 105 | /** |
248 | * Join message. Used to initialize the session later, | ||
249 | * if the identity of the local peer is not yet known. | ||
250 | * NULL if the session has been fully initialized. | ||
251 | */ | ||
252 | struct GNUNET_CONSENSUS_JoinMessage *join_msg; | ||
253 | |||
254 | /** | ||
255 | * Global consensus identification, computed | 106 | * Global consensus identification, computed |
256 | * from the session id and participating authorities. | 107 | * from the session id and participating authorities. |
257 | */ | 108 | */ |
258 | struct GNUNET_HashCode global_id; | 109 | struct GNUNET_HashCode global_id; |
259 | 110 | ||
260 | /** | 111 | /** |
261 | * The server's client and associated local state | 112 | * Client that inhabits the session |
262 | */ | 113 | */ |
263 | struct ServerClientSocketState scss; | 114 | struct GNUNET_SERVER_Client *client; |
264 | 115 | ||
265 | /** | 116 | /** |
266 | * Queued messages to the client. | 117 | * Queued messages to the client. |
267 | */ | 118 | */ |
268 | struct MessageQueue *client_mq; | 119 | struct GNUNET_MQ_MessageQueue *client_mq; |
269 | |||
270 | /** | ||
271 | * IBF_Key -> 2^(HashCode*) | ||
272 | * FIXME: | ||
273 | * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *. | ||
274 | */ | ||
275 | struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map; | ||
276 | |||
277 | /** | ||
278 | * Maps HashCodes to ElementInfos | ||
279 | */ | ||
280 | struct GNUNET_CONTAINER_MultiHashMap *values; | ||
281 | |||
282 | /** | ||
283 | * Currently active transmit handle for sending to the client | ||
284 | */ | ||
285 | struct GNUNET_SERVER_TransmitHandle *client_th; | ||
286 | 120 | ||
287 | /** | 121 | /** |
288 | * Timeout for all rounds together, single rounds will schedule a timeout task | 122 | * Timeout for all rounds together, single rounds will schedule a timeout task |
289 | * with a fraction of the conclude timeout. | 123 | * with a fraction of the conclude timeout. |
124 | * Only valid once the current round is not CONSENSUS_ROUND_BEGIN. | ||
290 | */ | 125 | */ |
291 | struct GNUNET_TIME_Relative conclude_timeout; | 126 | struct GNUNET_TIME_Relative conclude_timeout; |
292 | 127 | ||
293 | /** | 128 | /** |
294 | * Timeout task identifier for the current round | 129 | * Timeout task identifier for the current round. |
295 | */ | 130 | */ |
296 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; | 131 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; |
297 | 132 | ||
298 | /** | 133 | /** |
299 | * Number of other peers in the consensus | 134 | * Number of other peers in the consensus. |
300 | */ | 135 | */ |
301 | unsigned int num_peers; | 136 | unsigned int num_peers; |
302 | 137 | ||
@@ -307,26 +142,11 @@ struct ConsensusSession | |||
307 | struct ConsensusPeerInformation *info; | 142 | struct ConsensusPeerInformation *info; |
308 | 143 | ||
309 | /** | 144 | /** |
310 | * GNUNET_YES if the client has called conclude. | ||
311 | * */ | ||
312 | int conclude; | ||
313 | |||
314 | /** | ||
315 | * Index of the local peer in the peers array | 145 | * Index of the local peer in the peers array |
316 | */ | 146 | */ |
317 | unsigned int local_peer_idx; | 147 | unsigned int local_peer_idx; |
318 | 148 | ||
319 | /** | 149 | /** |
320 | * Strata estimator, computed online | ||
321 | */ | ||
322 | struct StrataEstimator *se; | ||
323 | |||
324 | /** | ||
325 | * Pre-computed IBFs | ||
326 | */ | ||
327 | struct InvertibleBloomFilter **ibfs; | ||
328 | |||
329 | /** | ||
330 | * Current round | 150 | * Current round |
331 | */ | 151 | */ |
332 | enum ConsensusRound current_round; | 152 | enum ConsensusRound current_round; |
@@ -337,19 +157,36 @@ struct ConsensusSession | |||
337 | */ | 157 | */ |
338 | int *shuffle; | 158 | int *shuffle; |
339 | 159 | ||
160 | /** | ||
161 | * Current round of the exponential scheme. | ||
162 | */ | ||
340 | int exp_round; | 163 | int exp_round; |
341 | 164 | ||
165 | /** | ||
166 | * Current sub-round of the exponential scheme. | ||
167 | */ | ||
342 | int exp_subround; | 168 | int exp_subround; |
343 | 169 | ||
344 | /** | 170 | /** |
345 | * The partner for the current exp-round | 171 | * The partner for the current exp-round |
346 | */ | 172 | */ |
347 | struct ConsensusPeerInformation* partner_outgoing; | 173 | struct ConsensusPeerInformation *partner_outgoing; |
348 | 174 | ||
349 | /** | 175 | /** |
350 | * The partner for the current exp-round | 176 | * The partner for the current exp-round |
351 | */ | 177 | */ |
352 | struct ConsensusPeerInformation* partner_incoming; | 178 | struct ConsensusPeerInformation *partner_incoming; |
179 | |||
180 | /** | ||
181 | * The consensus set of this session. | ||
182 | */ | ||
183 | struct GNUNET_SET_Handle *element_set; | ||
184 | |||
185 | /** | ||
186 | * Listener for requests from other peers. | ||
187 | * Uses the session's global id as app id. | ||
188 | */ | ||
189 | struct GNUNET_SET_ListenHandle *set_listener; | ||
353 | }; | 190 | }; |
354 | 191 | ||
355 | 192 | ||
@@ -374,41 +211,6 @@ struct ConsensusPeerInformation | |||
374 | */ | 211 | */ |
375 | int hello; | 212 | int hello; |
376 | 213 | ||
377 | /* | ||
378 | * FIXME | ||
379 | */ | ||
380 | struct MessageStreamState mss; | ||
381 | |||
382 | /** | ||
383 | * Current state | ||
384 | */ | ||
385 | enum ConsensusIBFState ibf_state; | ||
386 | |||
387 | /** | ||
388 | * What is the order (=log2 size) of the ibf | ||
389 | * we're currently dealing with? | ||
390 | * Interpretation depends on ibf_state. | ||
391 | */ | ||
392 | int ibf_order; | ||
393 | |||
394 | /** | ||
395 | * The current IBF for this peer, | ||
396 | * purpose dependent on ibf_state | ||
397 | */ | ||
398 | struct InvertibleBloomFilter *ibf; | ||
399 | |||
400 | /** | ||
401 | * How many buckets have we transmitted/received? | ||
402 | * Interpretatin depends on ibf_state | ||
403 | */ | ||
404 | int ibf_bucket_counter; | ||
405 | |||
406 | /** | ||
407 | * Strata estimator of the peer, NULL if our peer | ||
408 | * initiated the reconciliation. | ||
409 | */ | ||
410 | struct StrataEstimator *se; | ||
411 | |||
412 | /** | 214 | /** |
413 | * Back-reference to the consensus session, | 215 | * Back-reference to the consensus session, |
414 | * to that ConsensusPeerInformation can be used as a closure | 216 | * to that ConsensusPeerInformation can be used as a closure |
@@ -416,18 +218,6 @@ struct ConsensusPeerInformation | |||
416 | struct ConsensusSession *session; | 218 | struct ConsensusSession *session; |
417 | 219 | ||
418 | /** | 220 | /** |
419 | * True if we are actually replaying the strata message, | ||
420 | * e.g. currently handling the premature_strata_message. | ||
421 | */ | ||
422 | int replaying_strata_message; | ||
423 | |||
424 | /** | ||
425 | * A strata message that is not actually for the current round, | ||
426 | * used in the exp-scheme. | ||
427 | */ | ||
428 | struct StrataMessage *premature_strata_message; | ||
429 | |||
430 | /** | ||
431 | * We have finishes the exp-subround with the peer. | 221 | * We have finishes the exp-subround with the peer. |
432 | */ | 222 | */ |
433 | int exp_subround_finished; | 223 | int exp_subround_finished; |
@@ -444,65 +234,15 @@ struct ConsensusPeerInformation | |||
444 | * older round, while we are already in the next round. | 234 | * older round, while we are already in the next round. |
445 | */ | 235 | */ |
446 | enum ConsensusRound apparent_round; | 236 | enum ConsensusRound apparent_round; |
447 | }; | ||
448 | |||
449 | |||
450 | /** | ||
451 | * Sockets from other peers who want to communicate with us. | ||
452 | * It may not be known yet which consensus session they belong to, we have to wait for the | ||
453 | * peer's hello. | ||
454 | * Also, the session might not exist yet locally, we have to wait for a local client to connect. | ||
455 | */ | ||
456 | struct IncomingSocket | ||
457 | { | ||
458 | /** | ||
459 | * Incoming sockets are kept in a double linked list. | ||
460 | */ | ||
461 | struct IncomingSocket *next; | ||
462 | |||
463 | /** | ||
464 | * Incoming sockets are kept in a double linked list. | ||
465 | */ | ||
466 | struct IncomingSocket *prev; | ||
467 | |||
468 | /** | ||
469 | * Peer that connected to us with the socket. | ||
470 | */ | ||
471 | struct GNUNET_PeerIdentity peer_id; | ||
472 | 237 | ||
473 | /** | 238 | /** |
474 | * Peer-in-session this socket belongs to, once known, otherwise NULL. | 239 | * Set operation we are currently executing with this peer. |
475 | */ | 240 | */ |
476 | struct ConsensusPeerInformation *cpi; | 241 | struct GNUNET_SET_OperationHandle *set_op; |
477 | |||
478 | /** | ||
479 | * Set to the global session id, if the peer sent us a hello-message, | ||
480 | * but the session does not exist yet. | ||
481 | */ | ||
482 | struct GNUNET_HashCode *requested_gid; | ||
483 | |||
484 | /* | ||
485 | * Timeout, will disconnect the socket if not yet in a session. | ||
486 | * FIXME: implement | ||
487 | */ | ||
488 | GNUNET_SCHEDULER_TaskIdentifier timeout; | ||
489 | |||
490 | /* FIXME */ | ||
491 | struct MessageStreamState mss; | ||
492 | }; | 242 | }; |
493 | 243 | ||
494 | 244 | ||
495 | /** | 245 | /** |
496 | * Linked list of incoming sockets. | ||
497 | */ | ||
498 | static struct IncomingSocket *incoming_sockets_head; | ||
499 | |||
500 | /** | ||
501 | * Linked list of incoming sockets. | ||
502 | */ | ||
503 | static struct IncomingSocket *incoming_sockets_tail; | ||
504 | |||
505 | /** | ||
506 | * Linked list of sessions this peer participates in. | 246 | * Linked list of sessions this peer participates in. |
507 | */ | 247 | */ |
508 | static struct ConsensusSession *sessions_head; | 248 | static struct ConsensusSession *sessions_head; |
@@ -525,297 +265,10 @@ static struct GNUNET_SERVER_Handle *srv; | |||
525 | /** | 265 | /** |
526 | * Peer that runs this service. | 266 | * Peer that runs this service. |
527 | */ | 267 | */ |
528 | static struct GNUNET_PeerIdentity *my_peer; | 268 | static struct GNUNET_PeerIdentity my_peer; |
529 | |||
530 | /** | ||
531 | * Handle to the core service. Only used during service startup, will be NULL after that. | ||
532 | */ | ||
533 | static struct GNUNET_CORE_Handle *core; | ||
534 | |||
535 | /** | ||
536 | * Listener for sockets from peers that want to reconcile with us. | ||
537 | */ | ||
538 | static struct GNUNET_STREAM_ListenSocket *listener; | ||
539 | |||
540 | |||
541 | /** | ||
542 | * Transmit a queued message to the session's client. | ||
543 | * | ||
544 | * @param cls consensus session | ||
545 | * @param size number of bytes available in buf | ||
546 | * @param buf where the callee should write the message | ||
547 | * @return number of bytes written to buf | ||
548 | */ | ||
549 | static size_t | ||
550 | transmit_queued (void *cls, size_t size, | ||
551 | void *buf) | ||
552 | { | ||
553 | struct MessageQueue *mq = cls; | ||
554 | struct PendingMessage *pm = mq->pending_head; | ||
555 | struct ServerClientSocketState *state = mq->state; | ||
556 | size_t msg_size; | ||
557 | |||
558 | GNUNET_assert (NULL != pm); | ||
559 | GNUNET_assert (NULL != buf); | ||
560 | msg_size = ntohs (pm->msg->size); | ||
561 | GNUNET_assert (size >= msg_size); | ||
562 | memcpy (buf, pm->msg, msg_size); | ||
563 | GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); | ||
564 | state->th = NULL; | ||
565 | client_send_next (cls); | ||
566 | GNUNET_free (pm); | ||
567 | return msg_size; | ||
568 | } | ||
569 | |||
570 | |||
571 | static void | ||
572 | client_send_next (struct MessageQueue *mq) | ||
573 | { | ||
574 | struct ServerClientSocketState *state = mq->state; | ||
575 | int msize; | ||
576 | |||
577 | GNUNET_assert (NULL != state); | ||
578 | |||
579 | if ( (NULL != state->th) || | ||
580 | (NULL == mq->pending_head) ) | ||
581 | return; | ||
582 | msize = ntohs (mq->pending_head->msg->size); | ||
583 | state->th = | ||
584 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, | ||
585 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
586 | &transmit_queued, mq); | ||
587 | } | ||
588 | |||
589 | |||
590 | struct MessageQueue * | ||
591 | create_message_queue_for_server_client (struct ServerClientSocketState *scss) | ||
592 | { | ||
593 | struct MessageQueue *mq; | ||
594 | mq = GNUNET_new (struct MessageQueue); | ||
595 | mq->add_cb = client_send_next; | ||
596 | mq->state = scss; | ||
597 | return mq; | ||
598 | } | ||
599 | |||
600 | |||
601 | /** | ||
602 | * Functions of this signature are called whenever writing operations | ||
603 | * on a stream are executed | ||
604 | * | ||
605 | * @param cls the closure from GNUNET_STREAM_write | ||
606 | * @param status the status of the stream at the time this function is called; | ||
607 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
608 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
609 | * (this doesn't mean that the data is never sent, the receiver may | ||
610 | * have read the data but its ACKs may have been lost); | ||
611 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
612 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
613 | * be processed. | ||
614 | * @param size the number of bytes written | ||
615 | */ | ||
616 | static void | ||
617 | write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
618 | { | ||
619 | struct MessageQueue *mq = cls; | ||
620 | struct MessageStreamState *mss = mq->state; | ||
621 | struct PendingMessage *pm; | ||
622 | |||
623 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
624 | |||
625 | /* call cb for message we finished sending */ | ||
626 | pm = mq->current_pm; | ||
627 | if (NULL != pm) | ||
628 | { | ||
629 | if (NULL != pm->sent_cb) | ||
630 | pm->sent_cb (pm->sent_cb_cls); | ||
631 | GNUNET_free (pm); | ||
632 | } | ||
633 | |||
634 | mss->wh = NULL; | ||
635 | |||
636 | pm = mq->pending_head; | ||
637 | mq->current_pm = pm; | ||
638 | if (NULL == pm) | ||
639 | return; | ||
640 | GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); | ||
641 | mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size), | ||
642 | GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls); | ||
643 | GNUNET_assert (NULL != mss->wh); | ||
644 | } | ||
645 | |||
646 | |||
647 | static void | ||
648 | stream_socket_add_cb (struct MessageQueue *mq) | ||
649 | { | ||
650 | if (NULL != mq->current_pm) | ||
651 | return; | ||
652 | write_queued (mq, GNUNET_STREAM_OK, 0); | ||
653 | } | ||
654 | |||
655 | |||
656 | struct MessageQueue * | ||
657 | create_message_queue_for_stream_socket (struct MessageStreamState *mss) | ||
658 | { | ||
659 | struct MessageQueue *mq; | ||
660 | mq = GNUNET_new (struct MessageQueue); | ||
661 | mq->state = mss; | ||
662 | mq->add_cb = stream_socket_add_cb; | ||
663 | return mq; | ||
664 | } | ||
665 | |||
666 | |||
667 | struct PendingMessage * | ||
668 | new_pending_message (uint16_t size, uint16_t type) | ||
669 | { | ||
670 | struct PendingMessage *pm; | ||
671 | pm = GNUNET_malloc (sizeof *pm + size); | ||
672 | pm->msg = (void *) &pm[1]; | ||
673 | pm->msg->size = htons (size); | ||
674 | pm->msg->type = htons (type); | ||
675 | return pm; | ||
676 | } | ||
677 | |||
678 | |||
679 | /** | ||
680 | * Queue a message in a message queue. | ||
681 | * | ||
682 | * @param queue the message queue | ||
683 | * @param pending message, message with additional information | ||
684 | */ | ||
685 | void | ||
686 | message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg) | ||
687 | { | ||
688 | GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg); | ||
689 | queue->add_cb (queue); | ||
690 | } | ||
691 | |||
692 | |||
693 | /** | ||
694 | * Called when we receive data from a peer via stream. | ||
695 | * | ||
696 | * @param cls the closure from GNUNET_STREAM_read | ||
697 | * @param status the status of the stream at the time this function is called | ||
698 | * @param data traffic from the other side | ||
699 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
700 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
701 | * given to the next time the read processor is called). | ||
702 | */ | ||
703 | static size_t | ||
704 | stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size) | ||
705 | { | ||
706 | struct MessageStreamState *mss = cls; | ||
707 | int ret; | ||
708 | |||
709 | mss->rh = NULL; | ||
710 | |||
711 | if (GNUNET_STREAM_OK != status) | ||
712 | { | ||
713 | /* FIXME: handle this correctly */ | ||
714 | GNUNET_break (0); | ||
715 | return 0; | ||
716 | } | ||
717 | GNUNET_assert (NULL != mss->mst); | ||
718 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES); | ||
719 | if (GNUNET_SYSERR == ret) | ||
720 | { | ||
721 | /* FIXME: handle this correctly */ | ||
722 | GNUNET_break (0); | ||
723 | return 0; | ||
724 | } | ||
725 | /* read again */ | ||
726 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss); | ||
727 | /* we always read all data */ | ||
728 | return size; | ||
729 | } | ||
730 | |||
731 | |||
732 | /** | ||
733 | * Send element or element report to the peer specified in cpi. | ||
734 | * | ||
735 | * @param cpi peer to send the elements to | ||
736 | * @param head head of the element list | ||
737 | */ | ||
738 | static void | ||
739 | send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e) | ||
740 | { | ||
741 | struct PendingMessage *pm; | ||
742 | |||
743 | switch (cpi->apparent_round) | ||
744 | { | ||
745 | case CONSENSUS_ROUND_COMPLETION: | ||
746 | case CONSENSUS_ROUND_EXCHANGE: | ||
747 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size, | ||
748 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | ||
749 | memcpy (&pm->msg[1], e->element->data, e->element->size); | ||
750 | message_queue_add (cpi->mss.mq, pm); | ||
751 | break; | ||
752 | case CONSENSUS_ROUND_INVENTORY: | ||
753 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode), | ||
754 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); | ||
755 | memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode)); | ||
756 | message_queue_add (cpi->mss.mq, pm); | ||
757 | break; | ||
758 | default: | ||
759 | GNUNET_break (0); | ||
760 | } | ||
761 | } | ||
762 | |||
763 | |||
764 | /** | ||
765 | * Iterator to insert values into an ibf. | ||
766 | * | ||
767 | * @param cls closure | ||
768 | * @param key current key code | ||
769 | * @param value value in the hash map | ||
770 | * @return GNUNET_YES if we should continue to | ||
771 | * iterate, | ||
772 | * GNUNET_NO if not. | ||
773 | */ | ||
774 | static int | ||
775 | ibf_values_iterator (void *cls, | ||
776 | const struct GNUNET_HashCode *key, | ||
777 | void *value) | ||
778 | { | ||
779 | struct ConsensusPeerInformation *cpi = cls; | ||
780 | struct ElementInfo *e = value; | ||
781 | struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash); | ||
782 | |||
783 | GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); | ||
784 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); | ||
785 | return GNUNET_YES; | ||
786 | } | ||
787 | |||
788 | /** | ||
789 | * Create and populate an IBF for the specified peer, | ||
790 | * if it does not already exist. | ||
791 | * | ||
792 | * @param cpi peer to create the ibf for | ||
793 | */ | ||
794 | static void | ||
795 | prepare_ibf (struct ConsensusPeerInformation *cpi) | ||
796 | { | ||
797 | if (NULL != cpi->session->ibfs[cpi->ibf_order]) | ||
798 | return; | ||
799 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); | ||
800 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); | ||
801 | } | ||
802 | |||
803 | |||
804 | /** | ||
805 | * Called when a remote peer wants to inform the local peer | ||
806 | * that the remote peer misses elements. | ||
807 | * Elements are not reconciled. | ||
808 | * | ||
809 | * @param cpi session | ||
810 | * @param msg message | ||
811 | */ | ||
812 | static int | ||
813 | handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
814 | { | ||
815 | GNUNET_assert (0); | ||
816 | } | ||
817 | 269 | ||
818 | 270 | ||
271 | /* | ||
819 | static int | 272 | static int |
820 | exp_subround_finished (const struct ConsensusSession *session) | 273 | exp_subround_finished (const struct ConsensusSession *session) |
821 | { | 274 | { |
@@ -831,8 +284,11 @@ exp_subround_finished (const struct ConsensusSession *session) | |||
831 | return GNUNET_YES; | 284 | return GNUNET_YES; |
832 | return GNUNET_NO; | 285 | return GNUNET_NO; |
833 | } | 286 | } |
287 | */ | ||
288 | |||
834 | 289 | ||
835 | 290 | ||
291 | /* | ||
836 | static int | 292 | static int |
837 | inventory_round_finished (struct ConsensusSession *session) | 293 | inventory_round_finished (struct ConsensusSession *session) |
838 | { | 294 | { |
@@ -846,61 +302,7 @@ inventory_round_finished (struct ConsensusSession *session) | |||
846 | return GNUNET_YES; | 302 | return GNUNET_YES; |
847 | return GNUNET_NO; | 303 | return GNUNET_NO; |
848 | } | 304 | } |
849 | 305 | */ | |
850 | |||
851 | static void | ||
852 | clear_message_stream_state (struct MessageStreamState *mss) | ||
853 | { | ||
854 | if (NULL != mss->mst) | ||
855 | { | ||
856 | GNUNET_SERVER_mst_destroy (mss->mst); | ||
857 | mss->mst = NULL; | ||
858 | } | ||
859 | if (NULL != mss->rh) | ||
860 | { | ||
861 | GNUNET_STREAM_read_cancel (mss->rh); | ||
862 | mss->rh = NULL; | ||
863 | } | ||
864 | if (NULL != mss->wh) | ||
865 | { | ||
866 | GNUNET_STREAM_write_cancel (mss->wh); | ||
867 | mss->wh = NULL; | ||
868 | } | ||
869 | if (NULL != mss->socket) | ||
870 | { | ||
871 | GNUNET_STREAM_close (mss->socket); | ||
872 | mss->socket = NULL; | ||
873 | } | ||
874 | if (NULL != mss->mq) | ||
875 | { | ||
876 | GNUNET_free (mss->mq); | ||
877 | mss->mq = NULL; | ||
878 | } | ||
879 | } | ||
880 | |||
881 | |||
882 | /** | ||
883 | * Iterator over hash map entries. | ||
884 | * | ||
885 | * @param cls closure | ||
886 | * @param key current key code | ||
887 | * @param value value in the hash map | ||
888 | * @return GNUNET_YES if we should continue to | ||
889 | * iterate, | ||
890 | * GNUNET_NO if not. | ||
891 | */ | ||
892 | static int | ||
893 | destroy_element_info_iter (void *cls, | ||
894 | const struct GNUNET_HashCode * key, | ||
895 | void *value) | ||
896 | { | ||
897 | struct ElementInfo *ei = value; | ||
898 | GNUNET_free (ei->element); | ||
899 | GNUNET_free (ei->element_hash); | ||
900 | GNUNET_free (ei); | ||
901 | return GNUNET_YES; | ||
902 | } | ||
903 | |||
904 | 306 | ||
905 | /** | 307 | /** |
906 | * Destroy a session, free all resources associated with it. | 308 | * Destroy a session, free all resources associated with it. |
@@ -913,11 +315,9 @@ destroy_session (struct ConsensusSession *session) | |||
913 | int i; | 315 | int i; |
914 | 316 | ||
915 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); | 317 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); |
916 | GNUNET_SERVER_client_drop (session->scss.client); | ||
917 | session->scss.client = NULL; | ||
918 | if (NULL != session->client_mq) | 318 | if (NULL != session->client_mq) |
919 | { | 319 | { |
920 | GNUNET_free (session->client_mq); | 320 | GNUNET_MQ_destroy (session->client_mq); |
921 | session->client_mq = NULL; | 321 | session->client_mq = NULL; |
922 | } | 322 | } |
923 | if (NULL != session->shuffle) | 323 | if (NULL != session->shuffle) |
@@ -925,617 +325,21 @@ destroy_session (struct ConsensusSession *session) | |||
925 | GNUNET_free (session->shuffle); | 325 | GNUNET_free (session->shuffle); |
926 | session->shuffle = NULL; | 326 | session->shuffle = NULL; |
927 | } | 327 | } |
928 | if (NULL != session->se) | ||
929 | { | ||
930 | strata_estimator_destroy (session->se); | ||
931 | session->se = NULL; | ||
932 | } | ||
933 | if (NULL != session->info) | 328 | if (NULL != session->info) |
934 | { | 329 | { |
935 | for (i = 0; i < session->num_peers; i++) | 330 | for (i = 0; i < session->num_peers; i++) |
936 | { | 331 | { |
937 | struct ConsensusPeerInformation *cpi; | 332 | struct ConsensusPeerInformation *cpi; |
938 | cpi = &session->info[i]; | 333 | cpi = &session->info[i]; |
939 | clear_message_stream_state (&cpi->mss); | 334 | GNUNET_free (cpi); |
940 | if (NULL != cpi->se) | ||
941 | { | ||
942 | strata_estimator_destroy (cpi->se); | ||
943 | cpi->se = NULL; | ||
944 | } | ||
945 | if (NULL != cpi->ibf) | ||
946 | { | ||
947 | ibf_destroy (cpi->ibf); | ||
948 | cpi->ibf = NULL; | ||
949 | } | ||
950 | } | 335 | } |
951 | GNUNET_free (session->info); | 336 | GNUNET_free (session->info); |
952 | session->info = NULL; | 337 | session->info = NULL; |
953 | } | 338 | } |
954 | if (NULL != session->ibfs) | ||
955 | { | ||
956 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
957 | { | ||
958 | if (NULL != session->ibfs[i]) | ||
959 | { | ||
960 | ibf_destroy (session->ibfs[i]); | ||
961 | session->ibfs[i] = NULL; | ||
962 | } | ||
963 | } | ||
964 | GNUNET_free (session->ibfs); | ||
965 | session->ibfs = NULL; | ||
966 | } | ||
967 | if (NULL != session->values) | ||
968 | { | ||
969 | GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL); | ||
970 | GNUNET_CONTAINER_multihashmap_destroy (session->values); | ||
971 | session->values = NULL; | ||
972 | } | ||
973 | |||
974 | if (NULL != session->ibf_key_map) | ||
975 | { | ||
976 | GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map); | ||
977 | session->ibf_key_map = NULL; | ||
978 | } | ||
979 | GNUNET_free (session); | 339 | GNUNET_free (session); |
980 | } | 340 | } |
981 | 341 | ||
982 | 342 | ||
983 | static void | ||
984 | send_client_conclude_done (struct ConsensusSession *session) | ||
985 | { | ||
986 | struct PendingMessage *pm; | ||
987 | |||
988 | /* check if client is even there anymore */ | ||
989 | if (NULL == session->scss.client) | ||
990 | return; | ||
991 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader), | ||
992 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | ||
993 | message_queue_add (session->client_mq, pm); | ||
994 | } | ||
995 | |||
996 | |||
997 | /** | ||
998 | * Check if a strata message is for the current round or not | ||
999 | * | ||
1000 | * @param session session we are in | ||
1001 | * @param strata_msg the strata message to check | ||
1002 | * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise | ||
1003 | */ | ||
1004 | static int | ||
1005 | is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) | ||
1006 | { | ||
1007 | switch (strata_msg->round) | ||
1008 | { | ||
1009 | case CONSENSUS_ROUND_COMPLETION: | ||
1010 | case CONSENSUS_ROUND_EXCHANGE: | ||
1011 | /* here, we also have to compare subrounds */ | ||
1012 | if ( (strata_msg->round != session->current_round) || | ||
1013 | (strata_msg->exp_round != session->exp_round) || | ||
1014 | (strata_msg->exp_subround != session->exp_subround) ) | ||
1015 | return GNUNET_YES; | ||
1016 | break; | ||
1017 | default: | ||
1018 | if (session->current_round != strata_msg->round) | ||
1019 | return GNUNET_YES; | ||
1020 | break; | ||
1021 | } | ||
1022 | return GNUNET_NO; | ||
1023 | } | ||
1024 | |||
1025 | |||
1026 | /** | ||
1027 | * Send a strata estimator. | ||
1028 | * | ||
1029 | * @param cpi the peer | ||
1030 | */ | ||
1031 | static void | ||
1032 | send_strata_estimator (struct ConsensusPeerInformation *cpi) | ||
1033 | { | ||
1034 | struct PendingMessage *pm; | ||
1035 | struct StrataMessage *strata_msg; | ||
1036 | |||
1037 | /* FIXME: why is this correct? */ | ||
1038 | cpi->apparent_round = cpi->session->current_round; | ||
1039 | cpi->ibf_state = IBF_STATE_NONE; | ||
1040 | cpi->ibf_bucket_counter = 0; | ||
1041 | |||
1042 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round); | ||
1043 | |||
1044 | pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE), | ||
1045 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | ||
1046 | strata_msg = (struct StrataMessage *) pm->msg; | ||
1047 | strata_msg->round = cpi->session->current_round; | ||
1048 | strata_msg->exp_round = cpi->session->exp_round; | ||
1049 | strata_msg->exp_subround = cpi->session->exp_subround; | ||
1050 | strata_estimator_write (cpi->session->se, &strata_msg[1]); | ||
1051 | message_queue_add (cpi->mss.mq, pm); | ||
1052 | } | ||
1053 | |||
1054 | |||
1055 | /** | ||
1056 | * Send an IBF of the order specified in cpi. | ||
1057 | * | ||
1058 | * @param cpi the peer | ||
1059 | */ | ||
1060 | static void | ||
1061 | send_ibf (struct ConsensusPeerInformation *cpi) | ||
1062 | { | ||
1063 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", | ||
1064 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1065 | |||
1066 | cpi->ibf_bucket_counter = 0; | ||
1067 | while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) | ||
1068 | { | ||
1069 | unsigned int num_buckets; | ||
1070 | struct PendingMessage *pm; | ||
1071 | struct DifferenceDigest *digest; | ||
1072 | |||
1073 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; | ||
1074 | /* limit to maximum */ | ||
1075 | if (num_buckets > BUCKETS_PER_MESSAGE) | ||
1076 | num_buckets = BUCKETS_PER_MESSAGE; | ||
1077 | |||
1078 | pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE), | ||
1079 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | ||
1080 | digest = (struct DifferenceDigest *) pm->msg; | ||
1081 | digest->order = cpi->ibf_order; | ||
1082 | digest->round = cpi->apparent_round; | ||
1083 | ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]); | ||
1084 | cpi->ibf_bucket_counter += num_buckets; | ||
1085 | message_queue_add (cpi->mss.mq, pm); | ||
1086 | } | ||
1087 | cpi->ibf_bucket_counter = 0; | ||
1088 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; | ||
1089 | } | ||
1090 | |||
1091 | |||
1092 | /** | ||
1093 | * Called when a peer sends us its strata estimator. | ||
1094 | * In response, we sent out IBF of appropriate size back. | ||
1095 | * | ||
1096 | * @param cpi session | ||
1097 | * @param strata_msg message | ||
1098 | */ | ||
1099 | static int | ||
1100 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) | ||
1101 | { | ||
1102 | unsigned int diff; | ||
1103 | |||
1104 | if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) && | ||
1105 | (strata_msg->round == CONSENSUS_ROUND_INVENTORY) ) | ||
1106 | { | ||
1107 | /* we still have to handle this request appropriately */ | ||
1108 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", | ||
1109 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1110 | } | ||
1111 | else if (is_premature_strata_message (cpi->session, strata_msg)) | ||
1112 | { | ||
1113 | if (GNUNET_NO == cpi->replaying_strata_message) | ||
1114 | { | ||
1115 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n", | ||
1116 | strata_msg->exp_round, strata_msg->exp_subround); | ||
1117 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header); | ||
1118 | } | ||
1119 | return GNUNET_YES; | ||
1120 | } | ||
1121 | |||
1122 | if (NULL == cpi->se) | ||
1123 | cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); | ||
1124 | |||
1125 | cpi->apparent_round = strata_msg->round; | ||
1126 | |||
1127 | if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) | ||
1128 | { | ||
1129 | LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n"); | ||
1130 | return GNUNET_NO; | ||
1131 | } | ||
1132 | strata_estimator_read (&strata_msg[1], cpi->se); | ||
1133 | GNUNET_assert (NULL != cpi->session->se); | ||
1134 | diff = strata_estimator_difference (cpi->session->se, cpi->se); | ||
1135 | |||
1136 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", | ||
1137 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); | ||
1138 | |||
1139 | switch (cpi->session->current_round) | ||
1140 | { | ||
1141 | case CONSENSUS_ROUND_EXCHANGE: | ||
1142 | case CONSENSUS_ROUND_INVENTORY: | ||
1143 | case CONSENSUS_ROUND_COMPLETION: | ||
1144 | /* send IBF of the right size */ | ||
1145 | cpi->ibf_order = 0; | ||
1146 | while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) ) | ||
1147 | cpi->ibf_order++; | ||
1148 | if (cpi->ibf_order > MAX_IBF_ORDER) | ||
1149 | cpi->ibf_order = MAX_IBF_ORDER; | ||
1150 | cpi->ibf_order += 1; | ||
1151 | /* create ibf if not already pre-computed */ | ||
1152 | prepare_ibf (cpi); | ||
1153 | if (NULL != cpi->ibf) | ||
1154 | ibf_destroy (cpi->ibf); | ||
1155 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1156 | cpi->ibf_bucket_counter = 0; | ||
1157 | send_ibf (cpi); | ||
1158 | break; | ||
1159 | default: | ||
1160 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n", | ||
1161 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1162 | break; | ||
1163 | } | ||
1164 | return GNUNET_YES; | ||
1165 | } | ||
1166 | |||
1167 | |||
1168 | |||
1169 | static int | ||
1170 | send_elements_iterator (void *cls, | ||
1171 | const struct GNUNET_HashCode * key, | ||
1172 | void *value) | ||
1173 | { | ||
1174 | struct ConsensusPeerInformation *cpi = cls; | ||
1175 | struct ElementInfo *ei; | ||
1176 | ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value); | ||
1177 | if (NULL == ei) | ||
1178 | { | ||
1179 | LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n", | ||
1180 | GNUNET_h2s((struct GNUNET_HashCode *) value)); | ||
1181 | return GNUNET_YES; | ||
1182 | } | ||
1183 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n"); | ||
1184 | send_element_or_report (cpi, ei); | ||
1185 | return GNUNET_YES; | ||
1186 | } | ||
1187 | |||
1188 | |||
1189 | /** | ||
1190 | * Decode the current diff ibf, and send elements/requests/reports/ | ||
1191 | * | ||
1192 | * @param cpi partner peer | ||
1193 | */ | ||
1194 | static void | ||
1195 | decode (struct ConsensusPeerInformation *cpi) | ||
1196 | { | ||
1197 | struct IBF_Key key; | ||
1198 | int side; | ||
1199 | |||
1200 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1201 | |||
1202 | while (1) | ||
1203 | { | ||
1204 | int res; | ||
1205 | |||
1206 | res = ibf_decode (cpi->ibf, &side, &key); | ||
1207 | if (GNUNET_SYSERR == res) | ||
1208 | { | ||
1209 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); | ||
1210 | /* decoding failed, we tell the other peer by sending our ibf with a larger order */ | ||
1211 | cpi->ibf_order++; | ||
1212 | prepare_ibf (cpi); | ||
1213 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1214 | cpi->ibf_bucket_counter = 0; | ||
1215 | send_ibf (cpi); | ||
1216 | return; | ||
1217 | } | ||
1218 | if (GNUNET_NO == res) | ||
1219 | { | ||
1220 | struct PendingMessage *pm; | ||
1221 | struct ConsensusRoundMessage *rmsg; | ||
1222 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); | ||
1223 | |||
1224 | pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); | ||
1225 | rmsg = (struct ConsensusRoundMessage *) pm->msg; | ||
1226 | rmsg->round = cpi->apparent_round; | ||
1227 | message_queue_add (cpi->mss.mq, pm); | ||
1228 | return; | ||
1229 | } | ||
1230 | if (-1 == side) | ||
1231 | { | ||
1232 | struct GNUNET_HashCode hashcode; | ||
1233 | /* we have the element(s), send it to the other peer */ | ||
1234 | ibf_hashcode_from_key (key, &hashcode); | ||
1235 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); | ||
1236 | } | ||
1237 | else | ||
1238 | { | ||
1239 | struct PendingMessage *pm; | ||
1240 | uint16_t type; | ||
1241 | |||
1242 | switch (cpi->apparent_round) | ||
1243 | { | ||
1244 | case CONSENSUS_ROUND_COMPLETION: | ||
1245 | /* FIXME: check if we really want to request the element */ | ||
1246 | case CONSENSUS_ROUND_EXCHANGE: | ||
1247 | case CONSENSUS_ROUND_INVENTORY: | ||
1248 | type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST; | ||
1249 | break; | ||
1250 | default: | ||
1251 | GNUNET_assert (0); | ||
1252 | } | ||
1253 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key), | ||
1254 | type); | ||
1255 | *(struct IBF_Key *) &pm->msg[1] = key; | ||
1256 | message_queue_add (cpi->mss.mq, pm); | ||
1257 | } | ||
1258 | } | ||
1259 | } | ||
1260 | |||
1261 | |||
1262 | static int | ||
1263 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) | ||
1264 | { | ||
1265 | int num_buckets; | ||
1266 | |||
1267 | /* FIXME: find out if we're still expecting the same ibf! */ | ||
1268 | |||
1269 | cpi->apparent_round = cpi->session->current_round; | ||
1270 | // FIXME: check header.size >= sizeof (DD) | ||
1271 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; | ||
1272 | switch (cpi->ibf_state) | ||
1273 | { | ||
1274 | case IBF_STATE_NONE: | ||
1275 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1276 | cpi->ibf_state = IBF_STATE_RECEIVING; | ||
1277 | cpi->ibf_order = digest->order; | ||
1278 | cpi->ibf_bucket_counter = 0; | ||
1279 | if (NULL != cpi->ibf) | ||
1280 | { | ||
1281 | ibf_destroy (cpi->ibf); | ||
1282 | cpi->ibf = NULL; | ||
1283 | } | ||
1284 | break; | ||
1285 | case IBF_STATE_ANTICIPATE_DIFF: | ||
1286 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n", | ||
1287 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1288 | cpi->ibf_state = IBF_STATE_RECEIVING; | ||
1289 | cpi->ibf_order = digest->order; | ||
1290 | cpi->ibf_bucket_counter = 0; | ||
1291 | if (NULL != cpi->ibf) | ||
1292 | { | ||
1293 | ibf_destroy (cpi->ibf); | ||
1294 | cpi->ibf = NULL; | ||
1295 | } | ||
1296 | break; | ||
1297 | case IBF_STATE_RECEIVING: | ||
1298 | break; | ||
1299 | default: | ||
1300 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1301 | return GNUNET_YES; | ||
1302 | } | ||
1303 | |||
1304 | if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) | ||
1305 | { | ||
1306 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1307 | return GNUNET_YES; | ||
1308 | } | ||
1309 | |||
1310 | if (NULL == cpi->ibf) | ||
1311 | cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); | ||
1312 | |||
1313 | ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf); | ||
1314 | cpi->ibf_bucket_counter += num_buckets; | ||
1315 | |||
1316 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | ||
1317 | { | ||
1318 | cpi->ibf_state = IBF_STATE_DECODING; | ||
1319 | cpi->ibf_bucket_counter = 0; | ||
1320 | prepare_ibf (cpi); | ||
1321 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); | ||
1322 | decode (cpi); | ||
1323 | } | ||
1324 | return GNUNET_YES; | ||
1325 | } | ||
1326 | |||
1327 | |||
1328 | /** | ||
1329 | * Insert an element into the consensus set of the specified session. | ||
1330 | * The element will not be copied, and freed when destroying the session. | ||
1331 | * | ||
1332 | * @param session session for new element | ||
1333 | * @param element element to insert | ||
1334 | */ | ||
1335 | static void | ||
1336 | insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) | ||
1337 | { | ||
1338 | struct GNUNET_HashCode hash; | ||
1339 | struct ElementInfo *e; | ||
1340 | struct IBF_Key ibf_key; | ||
1341 | int i; | ||
1342 | |||
1343 | e = GNUNET_new (struct ElementInfo); | ||
1344 | e->element = element; | ||
1345 | e->element_hash = GNUNET_new (struct GNUNET_HashCode); | ||
1346 | GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash); | ||
1347 | ibf_key = ibf_key_from_hashcode (e->element_hash); | ||
1348 | ibf_hashcode_from_key (ibf_key, &hash); | ||
1349 | strata_estimator_insert (session->se, &hash); | ||
1350 | GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e, | ||
1351 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
1352 | GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash, | ||
1353 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1354 | |||
1355 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
1356 | { | ||
1357 | if (NULL == session->ibfs[i]) | ||
1358 | continue; | ||
1359 | ibf_insert (session->ibfs[i], ibf_key); | ||
1360 | } | ||
1361 | } | ||
1362 | |||
1363 | |||
1364 | /** | ||
1365 | * Handle an element that another peer sent us | ||
1366 | */ | ||
1367 | static int | ||
1368 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) | ||
1369 | { | ||
1370 | struct GNUNET_CONSENSUS_Element *element; | ||
1371 | size_t size; | ||
1372 | |||
1373 | switch (cpi->session->current_round) | ||
1374 | { | ||
1375 | case CONSENSUS_ROUND_COMPLETION: | ||
1376 | /* FIXME: check if we really expect the element */ | ||
1377 | case CONSENSUS_ROUND_EXCHANGE: | ||
1378 | break; | ||
1379 | default: | ||
1380 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n"); | ||
1381 | return GNUNET_YES; | ||
1382 | } | ||
1383 | |||
1384 | size = ntohs (element_msg->size) - sizeof *element_msg; | ||
1385 | |||
1386 | element = GNUNET_malloc (size + sizeof *element); | ||
1387 | element->size = size; | ||
1388 | memcpy (&element[1], &element_msg[1], size); | ||
1389 | element->data = &element[1]; | ||
1390 | |||
1391 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n"); | ||
1392 | |||
1393 | insert_element (cpi->session, element); | ||
1394 | |||
1395 | return GNUNET_YES; | ||
1396 | } | ||
1397 | |||
1398 | |||
1399 | /** | ||
1400 | * Handle a request for elements. | ||
1401 | * | ||
1402 | * @param cpi peer that is requesting the element | ||
1403 | * @param msg the element request message | ||
1404 | */ | ||
1405 | static int | ||
1406 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) | ||
1407 | { | ||
1408 | struct GNUNET_HashCode hashcode; | ||
1409 | struct IBF_Key *ibf_key; | ||
1410 | unsigned int num; | ||
1411 | |||
1412 | /* element requests are allowed in every round */ | ||
1413 | |||
1414 | num = ntohs (msg->header.size) / sizeof (struct IBF_Key); | ||
1415 | |||
1416 | ibf_key = (struct IBF_Key *) &msg[1]; | ||
1417 | while (num--) | ||
1418 | { | ||
1419 | ibf_hashcode_from_key (*ibf_key, &hashcode); | ||
1420 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); | ||
1421 | ibf_key++; | ||
1422 | } | ||
1423 | return GNUNET_YES; | ||
1424 | } | ||
1425 | |||
1426 | static int | ||
1427 | is_peer_connected (struct ConsensusPeerInformation *cpi) | ||
1428 | { | ||
1429 | if (NULL == cpi->mss.socket) | ||
1430 | return GNUNET_NO; | ||
1431 | return GNUNET_YES; | ||
1432 | } | ||
1433 | |||
1434 | |||
1435 | static void | ||
1436 | ensure_peer_connected (struct ConsensusPeerInformation *cpi) | ||
1437 | { | ||
1438 | if (NULL != cpi->mss.socket) | ||
1439 | return; | ||
1440 | cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
1441 | open_cb, cpi, GNUNET_STREAM_OPTION_END); | ||
1442 | } | ||
1443 | |||
1444 | |||
1445 | /** | ||
1446 | * If necessary, send a message to the peer, depending on the current | ||
1447 | * round. | ||
1448 | */ | ||
1449 | static void | ||
1450 | embrace_peer (struct ConsensusPeerInformation *cpi) | ||
1451 | { | ||
1452 | if (GNUNET_NO == is_peer_connected (cpi)) | ||
1453 | { | ||
1454 | ensure_peer_connected (cpi); | ||
1455 | return; | ||
1456 | } | ||
1457 | if (GNUNET_NO == cpi->hello) | ||
1458 | return; | ||
1459 | /* FIXME: correctness of switch */ | ||
1460 | switch (cpi->session->current_round) | ||
1461 | { | ||
1462 | case CONSENSUS_ROUND_EXCHANGE: | ||
1463 | case CONSENSUS_ROUND_INVENTORY: | ||
1464 | if (cpi->session->partner_outgoing != cpi) | ||
1465 | break; | ||
1466 | /* fallthrough */ | ||
1467 | case CONSENSUS_ROUND_COMPLETION: | ||
1468 | send_strata_estimator (cpi); | ||
1469 | default: | ||
1470 | break; | ||
1471 | } | ||
1472 | } | ||
1473 | |||
1474 | |||
1475 | /** | ||
1476 | * Called when stream has finishes writing the hello message | ||
1477 | */ | ||
1478 | static void | ||
1479 | hello_cont (void *cls) | ||
1480 | { | ||
1481 | struct ConsensusPeerInformation *cpi = cls; | ||
1482 | |||
1483 | cpi->hello = GNUNET_YES; | ||
1484 | embrace_peer (cpi); | ||
1485 | } | ||
1486 | |||
1487 | |||
1488 | /** | ||
1489 | * Called when we established a stream connection to another peer | ||
1490 | * | ||
1491 | * @param cls cpi of the peer we just connected to | ||
1492 | * @param socket socket to use to communicate with the other side (read/write) | ||
1493 | */ | ||
1494 | static void | ||
1495 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | ||
1496 | { | ||
1497 | struct ConsensusPeerInformation *cpi = cls; | ||
1498 | struct PendingMessage *pm; | ||
1499 | struct ConsensusHello *hello; | ||
1500 | |||
1501 | GNUNET_assert (NULL == cpi->mss.mst); | ||
1502 | GNUNET_assert (NULL == cpi->mss.mq); | ||
1503 | |||
1504 | cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); | ||
1505 | cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); | ||
1506 | cpi->mss.mst_cls = cpi; | ||
1507 | |||
1508 | pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); | ||
1509 | hello = (struct ConsensusHello *) pm->msg; | ||
1510 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); | ||
1511 | pm->sent_cb = hello_cont; | ||
1512 | pm->sent_cb_cls = cpi; | ||
1513 | message_queue_add (cpi->mss.mq, pm); | ||
1514 | cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1515 | &stream_data_processor, &cpi->mss); | ||
1516 | } | ||
1517 | |||
1518 | |||
1519 | static void | ||
1520 | replay_premature_message (struct ConsensusPeerInformation *cpi) | ||
1521 | { | ||
1522 | if (NULL != cpi->premature_strata_message) | ||
1523 | { | ||
1524 | struct StrataMessage *sm; | ||
1525 | |||
1526 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); | ||
1527 | sm = cpi->premature_strata_message; | ||
1528 | cpi->premature_strata_message = NULL; | ||
1529 | |||
1530 | cpi->replaying_strata_message = GNUNET_YES; | ||
1531 | handle_p2p_strata (cpi, sm); | ||
1532 | cpi->replaying_strata_message = GNUNET_NO; | ||
1533 | |||
1534 | GNUNET_free (sm); | ||
1535 | } | ||
1536 | } | ||
1537 | |||
1538 | |||
1539 | /** | 343 | /** |
1540 | * Start the inventory round, contact all peers we are supposed to contact. | 344 | * Start the inventory round, contact all peers we are supposed to contact. |
1541 | * | 345 | * |
@@ -1548,11 +352,7 @@ start_inventory (struct ConsensusSession *session) | |||
1548 | int last; | 352 | int last; |
1549 | 353 | ||
1550 | for (i = 0; i < session->num_peers; i++) | 354 | for (i = 0; i < session->num_peers; i++) |
1551 | { | ||
1552 | session->info[i].ibf_bucket_counter = 0; | ||
1553 | session->info[i].ibf_state = IBF_STATE_NONE; | ||
1554 | session->info[i].is_outgoing = GNUNET_NO; | 355 | session->info[i].is_outgoing = GNUNET_NO; |
1555 | } | ||
1556 | 356 | ||
1557 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | 357 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
1558 | i = (session->local_peer_idx + 1) % session->num_peers; | 358 | i = (session->local_peer_idx + 1) % session->num_peers; |
@@ -1560,7 +360,7 @@ start_inventory (struct ConsensusSession *session) | |||
1560 | { | 360 | { |
1561 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); | 361 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); |
1562 | session->info[i].is_outgoing = GNUNET_YES; | 362 | session->info[i].is_outgoing = GNUNET_YES; |
1563 | embrace_peer (&session->info[i]); | 363 | // embrace_peer (&session->info[i]); |
1564 | i = (i + 1) % session->num_peers; | 364 | i = (i + 1) % session->num_peers; |
1565 | } | 365 | } |
1566 | // tie-breaker for even number of peers | 366 | // tie-breaker for even number of peers |
@@ -1568,49 +368,12 @@ start_inventory (struct ConsensusSession *session) | |||
1568 | { | 368 | { |
1569 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); | 369 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); |
1570 | session->info[last].is_outgoing = GNUNET_YES; | 370 | session->info[last].is_outgoing = GNUNET_YES; |
1571 | embrace_peer (&session->info[last]); | 371 | // embrace_peer (&session->info[last]); |
1572 | } | ||
1573 | |||
1574 | for (i = 0; i < session->num_peers; i++) | ||
1575 | { | ||
1576 | if (GNUNET_NO == session->info[i].is_outgoing) | ||
1577 | replay_premature_message (&session->info[i]); | ||
1578 | } | 372 | } |
1579 | } | 373 | } |
1580 | 374 | ||
1581 | 375 | ||
1582 | /** | 376 | /** |
1583 | * Iterator over hash map entries. | ||
1584 | * | ||
1585 | * @param cls closure | ||
1586 | * @param key current key code | ||
1587 | * @param value value in the hash map | ||
1588 | * @return GNUNET_YES if we should continue to | ||
1589 | * iterate, | ||
1590 | * GNUNET_NO if not. | ||
1591 | */ | ||
1592 | static int | ||
1593 | send_client_elements_iter (void *cls, | ||
1594 | const struct GNUNET_HashCode * key, | ||
1595 | void *value) | ||
1596 | { | ||
1597 | struct ConsensusSession *session = cls; | ||
1598 | struct ElementInfo *ei = value; | ||
1599 | struct PendingMessage *pm; | ||
1600 | |||
1601 | /* is the client still there? */ | ||
1602 | if (NULL == session->scss.client) | ||
1603 | return GNUNET_NO; | ||
1604 | |||
1605 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size, | ||
1606 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); | ||
1607 | message_queue_add (session->client_mq, pm); | ||
1608 | return GNUNET_YES; | ||
1609 | } | ||
1610 | |||
1611 | |||
1612 | |||
1613 | /** | ||
1614 | * Start the next round. | 377 | * Start the next round. |
1615 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | 378 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). |
1616 | * | 379 | * |
@@ -1630,7 +393,7 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1630 | session = cls; | 393 | session = cls; |
1631 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); | 394 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); |
1632 | 395 | ||
1633 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | 396 | if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK) |
1634 | { | 397 | { |
1635 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 398 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
1636 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 399 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
@@ -1648,8 +411,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1648 | if (session->num_peers <= 2) | 411 | if (session->num_peers <= 2) |
1649 | { | 412 | { |
1650 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); | 413 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); |
1651 | GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); | 414 | //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); |
1652 | send_client_conclude_done (session); | 415 | //send_client_conclude_done (session); |
1653 | session->current_round = CONSENSUS_ROUND_FINISH; | 416 | session->current_round = CONSENSUS_ROUND_FINISH; |
1654 | return; | 417 | return; |
1655 | } | 418 | } |
@@ -1663,7 +426,7 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1663 | break; | 426 | break; |
1664 | case CONSENSUS_ROUND_COMPLETION: | 427 | case CONSENSUS_ROUND_COMPLETION: |
1665 | session->current_round = CONSENSUS_ROUND_FINISH; | 428 | session->current_round = CONSENSUS_ROUND_FINISH; |
1666 | send_client_conclude_done (session); | 429 | //send_client_conclude_done (session); |
1667 | break; | 430 | break; |
1668 | default: | 431 | default: |
1669 | GNUNET_assert (0); | 432 | GNUNET_assert (0); |
@@ -1671,159 +434,9 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1671 | } | 434 | } |
1672 | 435 | ||
1673 | 436 | ||
1674 | static void | ||
1675 | fin_sent_cb (void *cls) | ||
1676 | { | ||
1677 | struct ConsensusPeerInformation *cpi; | ||
1678 | cpi = cls; | ||
1679 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); | ||
1680 | switch (cpi->session->current_round) | ||
1681 | { | ||
1682 | case CONSENSUS_ROUND_EXCHANGE: | ||
1683 | case CONSENSUS_ROUND_COMPLETION: | ||
1684 | if (cpi->session->current_round != cpi->apparent_round) | ||
1685 | { | ||
1686 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); | ||
1687 | break; | ||
1688 | } | ||
1689 | cpi->exp_subround_finished = GNUNET_YES; | ||
1690 | /* the subround is only really over if *both* partners are done */ | ||
1691 | if (GNUNET_YES == exp_subround_finished (cpi->session)) | ||
1692 | subround_over (cpi->session, NULL); | ||
1693 | else | ||
1694 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); | ||
1695 | break; | ||
1696 | case CONSENSUS_ROUND_INVENTORY: | ||
1697 | cpi->inventory_synced = GNUNET_YES; | ||
1698 | if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) | ||
1699 | round_over (cpi->session, NULL); | ||
1700 | /* FIXME: maybe go to next round */ | ||
1701 | break; | ||
1702 | default: | ||
1703 | GNUNET_break (0); | ||
1704 | } | ||
1705 | } | ||
1706 | |||
1707 | |||
1708 | /** | 437 | /** |
1709 | * The other peer wants us to inform that he sent us all the elements we requested. | 438 | * Adapt the shuffle of the session for the current round. |
1710 | */ | 439 | */ |
1711 | static int | ||
1712 | handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
1713 | { | ||
1714 | struct ConsensusRoundMessage *round_msg; | ||
1715 | round_msg = (struct ConsensusRoundMessage *) msg; | ||
1716 | /* FIXME: only call subround_over if round is the current one! */ | ||
1717 | switch (cpi->session->current_round) | ||
1718 | { | ||
1719 | case CONSENSUS_ROUND_EXCHANGE: | ||
1720 | case CONSENSUS_ROUND_COMPLETION: | ||
1721 | if (cpi->session->current_round != round_msg->round) | ||
1722 | { | ||
1723 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1724 | cpi->ibf_state = IBF_STATE_NONE; | ||
1725 | cpi->ibf_bucket_counter = 0; | ||
1726 | break; | ||
1727 | } | ||
1728 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1729 | cpi->exp_subround_finished = GNUNET_YES; | ||
1730 | if (GNUNET_YES == exp_subround_finished (cpi->session)) | ||
1731 | subround_over (cpi->session, NULL); | ||
1732 | else | ||
1733 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); | ||
1734 | break; | ||
1735 | case CONSENSUS_ROUND_INVENTORY: | ||
1736 | cpi->inventory_synced = GNUNET_YES; | ||
1737 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1738 | if (inventory_round_finished (cpi->session)) | ||
1739 | round_over (cpi->session, NULL); | ||
1740 | break; | ||
1741 | default: | ||
1742 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); | ||
1743 | break; | ||
1744 | } | ||
1745 | return GNUNET_YES; | ||
1746 | } | ||
1747 | |||
1748 | |||
1749 | /** | ||
1750 | * Gets called when the other peer wants us to inform that | ||
1751 | * it has decoded our ibf and sent us all elements / requests | ||
1752 | */ | ||
1753 | static int | ||
1754 | handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
1755 | { | ||
1756 | struct PendingMessage *pm; | ||
1757 | struct ConsensusRoundMessage *fin_msg; | ||
1758 | |||
1759 | /* FIXME: why handle current round?? */ | ||
1760 | switch (cpi->session->current_round) | ||
1761 | { | ||
1762 | case CONSENSUS_ROUND_INVENTORY: | ||
1763 | cpi->inventory_synced = GNUNET_YES; | ||
1764 | case CONSENSUS_ROUND_COMPLETION: | ||
1765 | case CONSENSUS_ROUND_EXCHANGE: | ||
1766 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n"); | ||
1767 | pm = new_pending_message (sizeof *fin_msg, | ||
1768 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); | ||
1769 | fin_msg = (struct ConsensusRoundMessage *) pm->msg; | ||
1770 | fin_msg->round = cpi->apparent_round; | ||
1771 | /* the subround is over once we kicked off sending the fin msg */ | ||
1772 | /* FIXME: assert we are talking to the right peer! */ | ||
1773 | /* FIXME: mark peer as synced */ | ||
1774 | pm->sent_cb = fin_sent_cb; | ||
1775 | pm->sent_cb_cls = cpi; | ||
1776 | message_queue_add (cpi->mss.mq, pm); | ||
1777 | break; | ||
1778 | default: | ||
1779 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); | ||
1780 | break; | ||
1781 | } | ||
1782 | return GNUNET_YES; | ||
1783 | } | ||
1784 | |||
1785 | |||
1786 | /** | ||
1787 | * Functions with this signature are called whenever a | ||
1788 | * complete message is received by the tokenizer. | ||
1789 | * | ||
1790 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
1791 | * | ||
1792 | * @param cls closure | ||
1793 | * @param client identification of the client | ||
1794 | * @param message the actual message | ||
1795 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
1796 | */ | ||
1797 | static int | ||
1798 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | ||
1799 | { | ||
1800 | struct ConsensusPeerInformation *cpi = cls; | ||
1801 | GNUNET_assert (NULL == client); | ||
1802 | GNUNET_assert (NULL != cls); | ||
1803 | switch (ntohs (message->type)) | ||
1804 | { | ||
1805 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: | ||
1806 | return handle_p2p_strata (cpi, (struct StrataMessage *) message); | ||
1807 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: | ||
1808 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); | ||
1809 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: | ||
1810 | return handle_p2p_element (cpi, message); | ||
1811 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT: | ||
1812 | return handle_p2p_element_report (cpi, message); | ||
1813 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: | ||
1814 | return handle_p2p_element_request (cpi, (struct ElementRequest *) message); | ||
1815 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED: | ||
1816 | return handle_p2p_synced (cpi, message); | ||
1817 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN: | ||
1818 | return handle_p2p_fin (cpi, message); | ||
1819 | default: | ||
1820 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n", | ||
1821 | ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey)); | ||
1822 | } | ||
1823 | return GNUNET_OK; | ||
1824 | } | ||
1825 | |||
1826 | |||
1827 | static void | 440 | static void |
1828 | shuffle (struct ConsensusSession *session) | 441 | shuffle (struct ConsensusSession *session) |
1829 | { | 442 | { |
@@ -1860,6 +473,7 @@ find_partners (struct ConsensusSession *session) | |||
1860 | { | 473 | { |
1861 | int mark[session->num_peers]; | 474 | int mark[session->num_peers]; |
1862 | int i; | 475 | int i; |
476 | |||
1863 | memset (mark, 0, session->num_peers * sizeof (int)); | 477 | memset (mark, 0, session->num_peers * sizeof (int)); |
1864 | session->partner_incoming = session->partner_outgoing = NULL; | 478 | session->partner_incoming = session->partner_outgoing = NULL; |
1865 | for (i = 0; i < session->num_peers; i++) | 479 | for (i = 0; i < session->num_peers; i++) |
@@ -1887,6 +501,22 @@ find_partners (struct ConsensusSession *session) | |||
1887 | 501 | ||
1888 | 502 | ||
1889 | /** | 503 | /** |
504 | * Callback for set operation results. Called for each element | ||
505 | * in the result set. | ||
506 | * | ||
507 | * @param cls closure | ||
508 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK | ||
509 | * @param status see enum GNUNET_SET_Status | ||
510 | */ | ||
511 | static void set_result_cb (void *cls, | ||
512 | const struct GNUNET_SET_Element *element, | ||
513 | enum GNUNET_SET_Status status) | ||
514 | { | ||
515 | /* FIXME */ | ||
516 | } | ||
517 | |||
518 | |||
519 | /** | ||
1890 | * Do the next subround in the exp-scheme. | 520 | * Do the next subround in the exp-scheme. |
1891 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | 521 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). |
1892 | * | 522 | * |
@@ -1905,9 +535,11 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1905 | return; | 535 | return; |
1906 | session = cls; | 536 | session = cls; |
1907 | /* cancel timeout */ | 537 | /* cancel timeout */ |
1908 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | 538 | if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK) |
539 | { | ||
1909 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 540 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
1910 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 541 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
542 | } | ||
1911 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ | 543 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ |
1912 | if ( (session->exp_round == NUM_EXP_ROUNDS) || | 544 | if ( (session->exp_round == NUM_EXP_ROUNDS) || |
1913 | ((session->num_peers == 2) && (session->exp_round == 1))) | 545 | ((session->num_peers == 2) && (session->exp_round == 1))) |
@@ -1938,8 +570,25 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1938 | session->exp_subround++; | 570 | session->exp_subround++; |
1939 | } | 571 | } |
1940 | 572 | ||
573 | /* determine the incoming and outgoing partner */ | ||
1941 | find_partners (session); | 574 | find_partners (session); |
1942 | 575 | ||
576 | if (NULL != session->partner_outgoing) | ||
577 | { | ||
578 | if (NULL != session->partner_outgoing->set_op) | ||
579 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); | ||
580 | session->partner_outgoing->set_op = | ||
581 | GNUNET_SET_evaluate (session->element_set, | ||
582 | &session->partner_outgoing->peer_id, | ||
583 | &session->global_id, | ||
584 | NULL, /* FIXME */ | ||
585 | 0, /* FIXME */ | ||
586 | GNUNET_SET_RESULT_ADDED, | ||
587 | set_result_cb, session); | ||
588 | |||
589 | |||
590 | } | ||
591 | |||
1943 | #ifdef GNUNET_EXTRA_LOGGING | 592 | #ifdef GNUNET_EXTRA_LOGGING |
1944 | { | 593 | { |
1945 | int in; | 594 | int in; |
@@ -1957,29 +606,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1957 | } | 606 | } |
1958 | #endif /* GNUNET_EXTRA_LOGGING */ | 607 | #endif /* GNUNET_EXTRA_LOGGING */ |
1959 | 608 | ||
1960 | if (NULL != session->partner_incoming) | ||
1961 | { | ||
1962 | session->partner_incoming->ibf_state = IBF_STATE_NONE; | ||
1963 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
1964 | session->partner_incoming->ibf_bucket_counter = 0; | ||
1965 | |||
1966 | /* maybe there's an early strata estimator? */ | ||
1967 | replay_premature_message (session->partner_incoming); | ||
1968 | } | ||
1969 | |||
1970 | if (NULL != session->partner_outgoing) | ||
1971 | { | ||
1972 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; | ||
1973 | session->partner_outgoing->ibf_bucket_counter = 0; | ||
1974 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
1975 | /* make sure peer is connected and send the SE */ | ||
1976 | embrace_peer (session->partner_outgoing); | ||
1977 | } | ||
1978 | |||
1979 | /* | ||
1980 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), | ||
1981 | subround_over, session); | ||
1982 | */ | ||
1983 | } | 609 | } |
1984 | 610 | ||
1985 | 611 | ||
@@ -2002,146 +628,6 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess | |||
2002 | 628 | ||
2003 | 629 | ||
2004 | /** | 630 | /** |
2005 | * Handle a HELLO-message, send when another peer wants to join a session where | ||
2006 | * our peer is a member. The session may or may not be inhabited yet. | ||
2007 | */ | ||
2008 | static int | ||
2009 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) | ||
2010 | { | ||
2011 | struct ConsensusSession *session; | ||
2012 | |||
2013 | if (NULL != inc->requested_gid) | ||
2014 | { | ||
2015 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n"); | ||
2016 | return GNUNET_YES; | ||
2017 | } | ||
2018 | if (NULL != inc->cpi) | ||
2019 | { | ||
2020 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n"); | ||
2021 | return GNUNET_YES; | ||
2022 | } | ||
2023 | |||
2024 | for (session = sessions_head; NULL != session; session = session->next) | ||
2025 | { | ||
2026 | int idx; | ||
2027 | struct ConsensusPeerInformation *cpi; | ||
2028 | if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) | ||
2029 | continue; | ||
2030 | idx = get_peer_idx (&inc->peer_id, session); | ||
2031 | GNUNET_assert (-1 != idx); | ||
2032 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); | ||
2033 | cpi = &session->info[idx]; | ||
2034 | inc->cpi = cpi; | ||
2035 | cpi->mss = inc->mss; | ||
2036 | cpi = &session->info[idx]; | ||
2037 | cpi->hello = GNUNET_YES; | ||
2038 | cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); | ||
2039 | embrace_peer (cpi); | ||
2040 | return GNUNET_YES; | ||
2041 | } | ||
2042 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); | ||
2043 | inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode)); | ||
2044 | return GNUNET_YES; | ||
2045 | } | ||
2046 | |||
2047 | |||
2048 | |||
2049 | /** | ||
2050 | * Handle tokenized messages from stream sockets. | ||
2051 | * Delegate them if the socket belongs to a session, | ||
2052 | * handle hello messages otherwise. | ||
2053 | * | ||
2054 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
2055 | * | ||
2056 | * @param cls closure, unused | ||
2057 | * @param client incoming socket this message comes from | ||
2058 | * @param message the actual message | ||
2059 | * | ||
2060 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
2061 | */ | ||
2062 | static int | ||
2063 | mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | ||
2064 | { | ||
2065 | struct IncomingSocket *inc; | ||
2066 | GNUNET_assert (NULL == client); | ||
2067 | GNUNET_assert (NULL != cls); | ||
2068 | inc = (struct IncomingSocket *) cls; | ||
2069 | switch (ntohs( message->type)) | ||
2070 | { | ||
2071 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: | ||
2072 | return handle_p2p_hello (inc, (struct ConsensusHello *) message); | ||
2073 | default: | ||
2074 | if (NULL != inc->cpi) | ||
2075 | return mst_session_callback (inc->cpi, client, message); | ||
2076 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n", | ||
2077 | ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey)); | ||
2078 | } | ||
2079 | return GNUNET_OK; | ||
2080 | } | ||
2081 | |||
2082 | |||
2083 | /** | ||
2084 | * Functions of this type are called upon new stream connection from other peers | ||
2085 | * or upon binding error which happen when the app_port given in | ||
2086 | * GNUNET_STREAM_listen() is already taken. | ||
2087 | * | ||
2088 | * @param cls the closure from GNUNET_STREAM_listen | ||
2089 | * @param socket the socket representing the stream; NULL on binding error | ||
2090 | * @param initiator the identity of the peer who wants to establish a stream | ||
2091 | * with us; NULL on binding error | ||
2092 | * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the | ||
2093 | * stream (the socket will be invalid after the call) | ||
2094 | */ | ||
2095 | static int | ||
2096 | listen_cb (void *cls, | ||
2097 | struct GNUNET_STREAM_Socket *socket, | ||
2098 | const struct GNUNET_PeerIdentity *initiator) | ||
2099 | { | ||
2100 | struct IncomingSocket *incoming; | ||
2101 | |||
2102 | if (NULL == socket) | ||
2103 | { | ||
2104 | GNUNET_break (0); | ||
2105 | return GNUNET_SYSERR; | ||
2106 | } | ||
2107 | incoming = GNUNET_malloc (sizeof *incoming); | ||
2108 | incoming->peer_id = *initiator; | ||
2109 | incoming->mss.socket = socket; | ||
2110 | incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
2111 | &stream_data_processor, &incoming->mss); | ||
2112 | incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); | ||
2113 | incoming->mss.mst_cls = incoming; | ||
2114 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); | ||
2115 | return GNUNET_OK; | ||
2116 | } | ||
2117 | |||
2118 | |||
2119 | /** | ||
2120 | * Disconnect a client, and destroy all sessions associated with it. | ||
2121 | * | ||
2122 | * @param client the client to disconnect | ||
2123 | */ | ||
2124 | static void | ||
2125 | disconnect_client (struct GNUNET_SERVER_Client *client) | ||
2126 | { | ||
2127 | struct ConsensusSession *session; | ||
2128 | GNUNET_SERVER_client_disconnect (client); | ||
2129 | |||
2130 | /* if the client owns a session, remove it */ | ||
2131 | session = sessions_head; | ||
2132 | while (NULL != session) | ||
2133 | { | ||
2134 | if (client == session->scss.client) | ||
2135 | { | ||
2136 | destroy_session (session); | ||
2137 | break; | ||
2138 | } | ||
2139 | session = session->next; | ||
2140 | } | ||
2141 | } | ||
2142 | |||
2143 | |||
2144 | /** | ||
2145 | * Compute a global, (hopefully) unique consensus session id, | 631 | * Compute a global, (hopefully) unique consensus session id, |
2146 | * from the local id of the consensus session, and the identities of all participants. | 632 | * from the local id of the consensus session, and the identities of all participants. |
2147 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of | 633 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of |
@@ -2188,7 +674,8 @@ hash_cmp (const void *h1, const void *h2) | |||
2188 | * add the local peer if not in the join message. | 674 | * add the local peer if not in the join message. |
2189 | */ | 675 | */ |
2190 | static void | 676 | static void |
2191 | initialize_session_peer_list (struct ConsensusSession *session) | 677 | initialize_session_peer_list (struct ConsensusSession *session, |
678 | struct GNUNET_CONSENSUS_JoinMessage *join_msg) | ||
2192 | { | 679 | { |
2193 | unsigned int local_peer_in_list; | 680 | unsigned int local_peer_in_list; |
2194 | uint32_t listed_peers; | 681 | uint32_t listed_peers; |
@@ -2196,19 +683,19 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
2196 | struct GNUNET_PeerIdentity *peers; | 683 | struct GNUNET_PeerIdentity *peers; |
2197 | unsigned int i; | 684 | unsigned int i; |
2198 | 685 | ||
2199 | GNUNET_assert (NULL != session->join_msg); | 686 | GNUNET_assert (NULL != join_msg); |
2200 | 687 | ||
2201 | /* peers in the join message, may or may not include the local peer */ | 688 | /* peers in the join message, may or may not include the local peer */ |
2202 | listed_peers = ntohl (session->join_msg->num_peers); | 689 | listed_peers = ntohl (join_msg->num_peers); |
2203 | 690 | ||
2204 | session->num_peers = listed_peers; | 691 | session->num_peers = listed_peers; |
2205 | 692 | ||
2206 | msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1]; | 693 | msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1]; |
2207 | 694 | ||
2208 | local_peer_in_list = GNUNET_NO; | 695 | local_peer_in_list = GNUNET_NO; |
2209 | for (i = 0; i < listed_peers; i++) | 696 | for (i = 0; i < listed_peers; i++) |
2210 | { | 697 | { |
2211 | if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) | 698 | if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity))) |
2212 | { | 699 | { |
2213 | local_peer_in_list = GNUNET_YES; | 700 | local_peer_in_list = GNUNET_YES; |
2214 | break; | 701 | break; |
@@ -2221,7 +708,7 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
2221 | peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 708 | peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); |
2222 | 709 | ||
2223 | if (GNUNET_NO == local_peer_in_list) | 710 | if (GNUNET_NO == local_peer_in_list) |
2224 | peers[session->num_peers - 1] = *my_peer; | 711 | peers[session->num_peers - 1] = my_peer; |
2225 | 712 | ||
2226 | memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); | 713 | memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); |
2227 | qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); | 714 | qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); |
@@ -2236,38 +723,34 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
2236 | session->info[i].peer_id = peers[i]; | 723 | session->info[i].peer_id = peers[i]; |
2237 | } | 724 | } |
2238 | 725 | ||
2239 | free (peers); | 726 | GNUNET_free (peers); |
2240 | } | 727 | } |
2241 | 728 | ||
2242 | 729 | ||
730 | |||
731 | |||
732 | |||
2243 | /** | 733 | /** |
2244 | * Add incoming peer connections to the session, | 734 | * Called when another peer wants to do a set operation with the |
2245 | * for peers who have connected to us before the local session has been established | 735 | * local peer. |
2246 | * | 736 | * |
2247 | * @param session ... | 737 | * @param other_peer the other peer |
738 | * @param context_msg message with application specific information from | ||
739 | * the other peer | ||
740 | * @param request request from the other peer, use GNUNET_SET_accept | ||
741 | * to accept it, otherwise the request will be refused | ||
742 | * Note that we don't use a return value here, as it is also | ||
743 | * necessary to specify the set we want to do the operation with, | ||
744 | * whith sometimes can be derived from the context message. | ||
745 | * Also necessary to specify the timeout. | ||
2248 | */ | 746 | */ |
2249 | static void | 747 | static void |
2250 | add_incoming_peers (struct ConsensusSession *session) | 748 | set_listen_cb (void *cls, |
749 | const struct GNUNET_PeerIdentity *other_peer, | ||
750 | const struct GNUNET_MessageHeader *context_msg, | ||
751 | struct GNUNET_SET_Request *request) | ||
2251 | { | 752 | { |
2252 | struct IncomingSocket *inc; | 753 | /* FIXME */ |
2253 | int i; | ||
2254 | struct ConsensusPeerInformation *cpi; | ||
2255 | |||
2256 | for (inc = incoming_sockets_head; NULL != inc; inc = inc->next) | ||
2257 | { | ||
2258 | if ( (NULL == inc->requested_gid) || | ||
2259 | (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) ) | ||
2260 | continue; | ||
2261 | for (i = 0; i < session->num_peers; i++) | ||
2262 | { | ||
2263 | cpi = &session->info[i]; | ||
2264 | cpi->peer_id = inc->peer_id; | ||
2265 | cpi->mss = inc->mss; | ||
2266 | cpi->hello = GNUNET_YES; | ||
2267 | inc->cpi = cpi; | ||
2268 | break; | ||
2269 | } | ||
2270 | } | ||
2271 | } | 754 | } |
2272 | 755 | ||
2273 | 756 | ||
@@ -2277,46 +760,59 @@ add_incoming_peers (struct ConsensusSession *session) | |||
2277 | * @param session the session to initialize | 760 | * @param session the session to initialize |
2278 | */ | 761 | */ |
2279 | static void | 762 | static void |
2280 | initialize_session (struct ConsensusSession *session) | 763 | initialize_session (struct ConsensusSession *session, |
764 | struct GNUNET_CONSENSUS_JoinMessage *join_msg) | ||
2281 | { | 765 | { |
2282 | struct ConsensusSession *other_session; | 766 | struct ConsensusSession *other_session; |
2283 | 767 | ||
2284 | GNUNET_assert (NULL != session->join_msg); | 768 | initialize_session_peer_list (session, join_msg); |
2285 | initialize_session_peer_list (session); | ||
2286 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); | 769 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); |
2287 | compute_global_id (session, &session->join_msg->session_id); | 770 | compute_global_id (session, &join_msg->session_id); |
2288 | 771 | ||
2289 | /* Check if some local client already owns the session. */ | 772 | /* check if some local client already owns the session. */ |
2290 | other_session = sessions_head; | 773 | other_session = sessions_head; |
2291 | while (NULL != other_session) | 774 | while (NULL != other_session) |
2292 | { | 775 | { |
2293 | if ((other_session != session) && | 776 | if ((other_session != session) && |
2294 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) | 777 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) |
2295 | { | 778 | { |
2296 | if (GNUNET_NO == other_session->conclude) | 779 | if (CONSENSUS_ROUND_FINISH != other_session->current_round) |
2297 | { | 780 | { |
2298 | GNUNET_break (0); | 781 | GNUNET_break (0); |
2299 | destroy_session (session); | 782 | destroy_session (session); |
2300 | return; | 783 | return; |
2301 | } | 784 | } |
2302 | GNUNET_SERVER_client_drop (other_session->scss.client); | ||
2303 | other_session->scss.client = NULL; | ||
2304 | break; | 785 | break; |
2305 | } | 786 | } |
2306 | other_session = other_session->next; | 787 | other_session = other_session->next; |
2307 | } | 788 | } |
2308 | 789 | ||
2309 | session->local_peer_idx = get_peer_idx (my_peer, session); | 790 | session->local_peer_idx = get_peer_idx (&my_peer, session); |
2310 | GNUNET_assert (-1 != session->local_peer_idx); | 791 | GNUNET_assert (-1 != session->local_peer_idx); |
792 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, | ||
793 | &session->global_id, | ||
794 | set_listen_cb, session); | ||
2311 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); | 795 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); |
2312 | GNUNET_free (session->join_msg); | ||
2313 | session->join_msg = NULL; | ||
2314 | add_incoming_peers (session); | ||
2315 | GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK); | ||
2316 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); | 796 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); |
2317 | } | 797 | } |
2318 | 798 | ||
2319 | 799 | ||
800 | static struct ConsensusSession * | ||
801 | get_session_by_client (struct GNUNET_SERVER_Client *client) | ||
802 | { | ||
803 | struct ConsensusSession *session; | ||
804 | |||
805 | session = sessions_head; | ||
806 | while (NULL != session) | ||
807 | { | ||
808 | if (session->client == client) | ||
809 | return session; | ||
810 | session = session->next; | ||
811 | } | ||
812 | return NULL; | ||
813 | } | ||
814 | |||
815 | |||
2320 | /** | 816 | /** |
2321 | * Called when a client wants to join a consensus session. | 817 | * Called when a client wants to join a consensus session. |
2322 | * | 818 | * |
@@ -2331,45 +827,20 @@ client_join (void *cls, | |||
2331 | { | 827 | { |
2332 | struct ConsensusSession *session; | 828 | struct ConsensusSession *session; |
2333 | 829 | ||
2334 | // make sure the client has not already joined a session | 830 | session = get_session_by_client (client); |
2335 | session = sessions_head; | 831 | if (NULL != session) |
2336 | while (NULL != session) | ||
2337 | { | 832 | { |
2338 | if (session->scss.client == client) | 833 | GNUNET_break (0); |
2339 | { | 834 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
2340 | GNUNET_break (0); | 835 | return; |
2341 | disconnect_client (client); | ||
2342 | return; | ||
2343 | } | ||
2344 | session = session->next; | ||
2345 | } | 836 | } |
2346 | |||
2347 | session = GNUNET_new (struct ConsensusSession); | 837 | session = GNUNET_new (struct ConsensusSession); |
2348 | session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); | ||
2349 | /* these have to be initialized here, as the client can already start to give us values */ | ||
2350 | session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); | ||
2351 | session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | ||
2352 | session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | ||
2353 | session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); | ||
2354 | session->scss.client = client; | ||
2355 | session->client_mq = create_message_queue_for_server_client (&session->scss); | ||
2356 | GNUNET_SERVER_client_keep (client); | 838 | GNUNET_SERVER_client_keep (client); |
2357 | |||
2358 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | 839 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); |
2359 | 840 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); | |
2360 | // Initialize session later if local peer identity is not known yet. | ||
2361 | if (NULL == my_peer) | ||
2362 | { | ||
2363 | GNUNET_SERVER_disable_receive_done_warning (client); | ||
2364 | return; | ||
2365 | } | ||
2366 | |||
2367 | initialize_session (session); | ||
2368 | } | 841 | } |
2369 | 842 | ||
2370 | 843 | ||
2371 | |||
2372 | |||
2373 | /** | 844 | /** |
2374 | * Called when a client performs an insert operation. | 845 | * Called when a client performs an insert operation. |
2375 | * | 846 | * |
@@ -2379,38 +850,48 @@ client_join (void *cls, | |||
2379 | */ | 850 | */ |
2380 | void | 851 | void |
2381 | client_insert (void *cls, | 852 | client_insert (void *cls, |
2382 | struct GNUNET_SERVER_Client *client, | 853 | struct GNUNET_SERVER_Client *client, |
2383 | const struct GNUNET_MessageHeader *m) | 854 | const struct GNUNET_MessageHeader *m) |
2384 | { | 855 | { |
2385 | struct ConsensusSession *session; | 856 | struct ConsensusSession *session; |
2386 | struct GNUNET_CONSENSUS_ElementMessage *msg; | 857 | struct GNUNET_CONSENSUS_ElementMessage *msg; |
2387 | struct GNUNET_CONSENSUS_Element *element; | 858 | struct GNUNET_SET_Element *element; |
2388 | int element_size; | 859 | ssize_t element_size; |
2389 | 860 | ||
2390 | session = sessions_head; | 861 | session = sessions_head; |
2391 | while (NULL != session) | 862 | while (NULL != session) |
2392 | { | 863 | { |
2393 | if (session->scss.client == client) | 864 | if (session->client == client) |
2394 | break; | 865 | break; |
2395 | } | 866 | } |
2396 | 867 | ||
2397 | if (NULL == session) | 868 | if (NULL == session) |
2398 | { | 869 | { |
2399 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); | 870 | GNUNET_break (0); |
871 | GNUNET_SERVER_client_disconnect (client); | ||
872 | return; | ||
873 | } | ||
874 | |||
875 | if (CONSENSUS_ROUND_BEGIN != session->current_round) | ||
876 | { | ||
877 | GNUNET_break (0); | ||
2400 | GNUNET_SERVER_client_disconnect (client); | 878 | GNUNET_SERVER_client_disconnect (client); |
2401 | return; | 879 | return; |
2402 | } | 880 | } |
2403 | 881 | ||
2404 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; | 882 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; |
2405 | element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 883 | element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
2406 | element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); | 884 | if (element_size < 0) |
885 | { | ||
886 | GNUNET_break (0); | ||
887 | return; | ||
888 | } | ||
889 | element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); | ||
2407 | element->type = msg->element_type; | 890 | element->type = msg->element_type; |
2408 | element->size = element_size; | 891 | element->size = element_size; |
2409 | memcpy (&element[1], &msg[1], element_size); | 892 | memcpy (&element[1], &msg[1], element_size); |
2410 | element->data = &element[1]; | 893 | element->data = &element[1]; |
2411 | GNUNET_assert (NULL != element->data); | 894 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); |
2412 | insert_element (session, element); | ||
2413 | |||
2414 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 895 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2415 | } | 896 | } |
2416 | 897 | ||
@@ -2432,9 +913,8 @@ client_conclude (void *cls, | |||
2432 | 913 | ||
2433 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; | 914 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; |
2434 | 915 | ||
2435 | session = sessions_head; | 916 | session = get_session_by_client (client); |
2436 | while ((session != NULL) && (session->scss.client != client)) | 917 | |
2437 | session = session->next; | ||
2438 | if (NULL == session) | 918 | if (NULL == session) |
2439 | { | 919 | { |
2440 | /* client not found */ | 920 | /* client not found */ |
@@ -2447,16 +927,12 @@ client_conclude (void *cls, | |||
2447 | { | 927 | { |
2448 | /* client requested conclude twice */ | 928 | /* client requested conclude twice */ |
2449 | GNUNET_break (0); | 929 | GNUNET_break (0); |
2450 | /* client may still own a session, destroy it */ | ||
2451 | disconnect_client (client); | ||
2452 | return; | 930 | return; |
2453 | } | 931 | } |
2454 | 932 | ||
2455 | session->conclude = GNUNET_YES; | ||
2456 | |||
2457 | if (session->num_peers <= 1) | 933 | if (session->num_peers <= 1) |
2458 | { | 934 | { |
2459 | send_client_conclude_done (session); | 935 | //send_client_conclude_done (session); |
2460 | } | 936 | } |
2461 | else | 937 | else |
2462 | { | 938 | { |
@@ -2465,48 +941,12 @@ client_conclude (void *cls, | |||
2465 | round_over (session, NULL); | 941 | round_over (session, NULL); |
2466 | } | 942 | } |
2467 | 943 | ||
944 | GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round); | ||
2468 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 945 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2469 | } | 946 | } |
2470 | 947 | ||
2471 | 948 | ||
2472 | /** | 949 | /** |
2473 | * Task that disconnects from core. | ||
2474 | * | ||
2475 | * @param cls core handle | ||
2476 | * @param tc context information (why was this task triggered now) | ||
2477 | */ | ||
2478 | static void | ||
2479 | disconnect_core (void *cls, | ||
2480 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
2481 | { | ||
2482 | if (core != NULL) | ||
2483 | { | ||
2484 | GNUNET_CORE_disconnect (core); | ||
2485 | core = NULL; | ||
2486 | } | ||
2487 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); | ||
2488 | } | ||
2489 | |||
2490 | |||
2491 | static void | ||
2492 | core_startup (void *cls, | ||
2493 | struct GNUNET_CORE_Handle *core, | ||
2494 | const struct GNUNET_PeerIdentity *peer) | ||
2495 | { | ||
2496 | struct ConsensusSession *session; | ||
2497 | |||
2498 | my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); | ||
2499 | /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ | ||
2500 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); | ||
2501 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); | ||
2502 | /* initialize sessions that are waiting for the local peer identity */ | ||
2503 | for (session = sessions_head; NULL != session; session = session->next) | ||
2504 | if (NULL != session->join_msg) | ||
2505 | initialize_session (session); | ||
2506 | } | ||
2507 | |||
2508 | |||
2509 | /** | ||
2510 | * Called to clean up, after a shutdown has been requested. | 950 | * Called to clean up, after a shutdown has been requested. |
2511 | * | 951 | * |
2512 | * @param cls closure | 952 | * @param cls closure |
@@ -2516,35 +956,8 @@ static void | |||
2516 | shutdown_task (void *cls, | 956 | shutdown_task (void *cls, |
2517 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 957 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
2518 | { | 958 | { |
2519 | while (NULL != incoming_sockets_head) | ||
2520 | { | ||
2521 | struct IncomingSocket *socket; | ||
2522 | socket = incoming_sockets_head; | ||
2523 | if (NULL == socket->cpi) | ||
2524 | clear_message_stream_state (&socket->mss); | ||
2525 | incoming_sockets_head = incoming_sockets_head->next; | ||
2526 | GNUNET_free (socket); | ||
2527 | } | ||
2528 | |||
2529 | while (NULL != sessions_head) | 959 | while (NULL != sessions_head) |
2530 | { | ||
2531 | struct ConsensusSession *session; | ||
2532 | session = sessions_head->next; | ||
2533 | destroy_session (sessions_head); | 960 | destroy_session (sessions_head); |
2534 | sessions_head = session; | ||
2535 | } | ||
2536 | |||
2537 | if (NULL != core) | ||
2538 | { | ||
2539 | GNUNET_CORE_disconnect (core); | ||
2540 | core = NULL; | ||
2541 | } | ||
2542 | |||
2543 | if (NULL != listener) | ||
2544 | { | ||
2545 | GNUNET_STREAM_listen_close (listener); | ||
2546 | listener = NULL; | ||
2547 | } | ||
2548 | 961 | ||
2549 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); | 962 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); |
2550 | } | 963 | } |
@@ -2560,10 +973,6 @@ shutdown_task (void *cls, | |||
2560 | static void | 973 | static void |
2561 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) | 974 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) |
2562 | { | 975 | { |
2563 | /* core is only used to retrieve the peer identity */ | ||
2564 | static const struct GNUNET_CORE_MessageHandler core_handlers[] = { | ||
2565 | {NULL, 0, 0} | ||
2566 | }; | ||
2567 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | 976 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { |
2568 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | 977 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, |
2569 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | 978 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, |
@@ -2574,21 +983,15 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
2574 | 983 | ||
2575 | cfg = c; | 984 | cfg = c; |
2576 | srv = server; | 985 | srv = server; |
2577 | 986 | if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &my_peer)) | |
987 | { | ||
988 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); | ||
989 | GNUNET_break (0); | ||
990 | GNUNET_SCHEDULER_shutdown (); | ||
991 | return; | ||
992 | } | ||
2578 | GNUNET_SERVER_add_handlers (server, server_handlers); | 993 | GNUNET_SERVER_add_handlers (server, server_handlers); |
2579 | |||
2580 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | 994 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); |
2581 | |||
2582 | listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
2583 | &listen_cb, NULL, | ||
2584 | GNUNET_STREAM_OPTION_END); | ||
2585 | |||
2586 | /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ | ||
2587 | core = GNUNET_CORE_connect (c, NULL, | ||
2588 | &core_startup, NULL, | ||
2589 | NULL, NULL, GNUNET_NO, NULL, | ||
2590 | GNUNET_NO, core_handlers); | ||
2591 | GNUNET_assert (NULL != core); | ||
2592 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | 995 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); |
2593 | } | 996 | } |
2594 | 997 | ||