aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c1945
1 files changed, 174 insertions, 1771 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index a7640c51f..44edeb215 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -29,14 +29,10 @@
29#include "gnunet_protocols.h" 29#include "gnunet_protocols.h"
30#include "gnunet_applications.h" 30#include "gnunet_applications.h"
31#include "gnunet_util_lib.h" 31#include "gnunet_util_lib.h"
32#include "gnunet_set_service.h"
32#include "gnunet_consensus_service.h" 33#include "gnunet_consensus_service.h"
33#include "gnunet_core_service.h"
34#include "gnunet_stream_lib.h"
35
36#include "consensus_protocol.h" 34#include "consensus_protocol.h"
37#include "consensus.h" 35#include "consensus.h"
38#include "ibf.h"
39#include "strata_estimator.h"
40 36
41 37
42/* 38/*
@@ -47,82 +43,19 @@
47 43
48 44
49/** 45/**
50 * Number of IBFs in a strata estimator.
51 */
52#define SE_STRATA_COUNT 32
53/**
54 * Size of the IBFs in the strata estimator.
55 */
56#define SE_IBF_SIZE 80
57/**
58 * hash num parameter for the difference digests and strata estimators
59 */
60#define SE_IBF_HASH_NUM 3
61
62/**
63 * Number of buckets that can be transmitted in one message.
64 */
65#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
66
67/**
68 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
69 * Choose this value so that computing the IBF is still cheaper
70 * than transmitting all values.
71 */
72#define MAX_IBF_ORDER (16)
73
74/**
75 * Number of exponential rounds, used in the inventory and completion round. 46 * Number of exponential rounds, used in the inventory and completion round.
76 */ 47 */
77#define NUM_EXP_ROUNDS (4) 48#define NUM_EXP_ROUNDS (4)
78 49
79
80/* forward declarations */ 50/* forward declarations */
81 51
82/* mutual recursion with struct ConsensusSession */ 52/* mutual recursion with struct ConsensusSession */
83struct ConsensusPeerInformation; 53struct ConsensusPeerInformation;
84 54
85struct MessageQueue;
86
87/* mutual recursion with round_over */ 55/* mutual recursion with round_over */
88static void 56static void
89subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 57subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
90 58
91/* mutial recursion with transmit_queued */
92static void
93client_send_next (struct MessageQueue *mq);
94
95/* mutual recursion with mst_session_callback */
96static void
97open_cb (void *cls, struct GNUNET_STREAM_Socket *socket);
98
99static int
100mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message);
101
102
103/**
104 * Additional information about a consensus element.
105 */
106struct ElementInfo
107{
108 /**
109 * The element itself.
110 */
111 struct GNUNET_CONSENSUS_Element *element;
112 /**
113 * Hash of the element
114 */
115 struct GNUNET_HashCode *element_hash;
116 /**
117 * Number of other peers that have the element in the inventory.
118 */
119 unsigned int inventory_count;
120 /**
121 * Bitmap of peers that have this element in their inventory
122 */
123 uint8_t *inventory_bitmap;
124};
125
126 59
127/** 60/**
128 * Describes the current round a consensus session is in. 61 * Describes the current round a consensus session is in.
@@ -138,7 +71,8 @@ enum ConsensusRound
138 */ 71 */
139 CONSENSUS_ROUND_EXCHANGE, 72 CONSENSUS_ROUND_EXCHANGE,
140 /** 73 /**
141 * Exchange which elements each peer has, but not the elements. 74 * Exchange which elements each peer has, but don't
75 * transmit the element's data, only their SHA-512 hashes.
142 * This round uses the all-to-all scheme. 76 * This round uses the all-to-all scheme.
143 */ 77 */
144 CONSENSUS_ROUND_INVENTORY, 78 CONSENSUS_ROUND_INVENTORY,
@@ -153,82 +87,6 @@ enum ConsensusRound
153 CONSENSUS_ROUND_FINISH 87 CONSENSUS_ROUND_FINISH
154}; 88};
155 89
156/* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */
157
158/**
159 * State of another peer with respect to the
160 * current ibf.
161 */
162enum ConsensusIBFState {
163 /**
164 * There is nothing going on with the IBF.
165 */
166 IBF_STATE_NONE=0,
167 /**
168 * We currently receive an ibf.
169 */
170 IBF_STATE_RECEIVING,
171 /*
172 * we decode a received ibf
173 */
174 IBF_STATE_DECODING,
175 /**
176 * wait for elements and element requests
177 */
178 IBF_STATE_ANTICIPATE_DIFF
179};
180
181
182typedef void (*AddCallback) (struct MessageQueue *mq);
183typedef void (*MessageSentCallback) (void *cls);
184
185
186/**
187 * Collection of the state necessary to read and write gnunet messages
188 * to a stream socket. Should be used as closure for stream_data_processor.
189 */
190struct MessageStreamState
191{
192 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
193 struct MessageQueue *mq;
194 void *mst_cls;
195 struct GNUNET_STREAM_Socket *socket;
196 struct GNUNET_STREAM_ReadHandle *rh;
197 struct GNUNET_STREAM_WriteHandle *wh;
198};
199
200
201struct ServerClientSocketState
202{
203 struct GNUNET_SERVER_Client *client;
204 struct GNUNET_SERVER_TransmitHandle* th;
205};
206
207
208/**
209 * Generic message queue, for queueing outgoing messages.
210 */
211struct MessageQueue
212{
213 void *state;
214 AddCallback add_cb;
215 struct PendingMessage *pending_head;
216 struct PendingMessage *pending_tail;
217 struct PendingMessage *current_pm;
218};
219
220
221struct PendingMessage
222{
223 struct GNUNET_MessageHeader *msg;
224 struct MessageQueue *parent_queue;
225 struct PendingMessage *next;
226 struct PendingMessage *prev;
227 MessageSentCallback sent_cb;
228 void *sent_cb_cls;
229};
230
231
232/** 90/**
233 * A consensus session consists of one local client and the remote authorities. 91 * A consensus session consists of one local client and the remote authorities.
234 */ 92 */
@@ -245,58 +103,35 @@ struct ConsensusSession
245 struct ConsensusSession *prev; 103 struct ConsensusSession *prev;
246 104
247 /** 105 /**
248 * Join message. Used to initialize the session later,
249 * if the identity of the local peer is not yet known.
250 * NULL if the session has been fully initialized.
251 */
252 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
253
254 /**
255 * Global consensus identification, computed 106 * Global consensus identification, computed
256 * from the session id and participating authorities. 107 * from the session id and participating authorities.
257 */ 108 */
258 struct GNUNET_HashCode global_id; 109 struct GNUNET_HashCode global_id;
259 110
260 /** 111 /**
261 * The server's client and associated local state 112 * Client that inhabits the session
262 */ 113 */
263 struct ServerClientSocketState scss; 114 struct GNUNET_SERVER_Client *client;
264 115
265 /** 116 /**
266 * Queued messages to the client. 117 * Queued messages to the client.
267 */ 118 */
268 struct MessageQueue *client_mq; 119 struct GNUNET_MQ_MessageQueue *client_mq;
269
270 /**
271 * IBF_Key -> 2^(HashCode*)
272 * FIXME:
273 * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *.
274 */
275 struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map;
276
277 /**
278 * Maps HashCodes to ElementInfos
279 */
280 struct GNUNET_CONTAINER_MultiHashMap *values;
281
282 /**
283 * Currently active transmit handle for sending to the client
284 */
285 struct GNUNET_SERVER_TransmitHandle *client_th;
286 120
287 /** 121 /**
288 * Timeout for all rounds together, single rounds will schedule a timeout task 122 * Timeout for all rounds together, single rounds will schedule a timeout task
289 * with a fraction of the conclude timeout. 123 * with a fraction of the conclude timeout.
124 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
290 */ 125 */
291 struct GNUNET_TIME_Relative conclude_timeout; 126 struct GNUNET_TIME_Relative conclude_timeout;
292 127
293 /** 128 /**
294 * Timeout task identifier for the current round 129 * Timeout task identifier for the current round.
295 */ 130 */
296 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; 131 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
297 132
298 /** 133 /**
299 * Number of other peers in the consensus 134 * Number of other peers in the consensus.
300 */ 135 */
301 unsigned int num_peers; 136 unsigned int num_peers;
302 137
@@ -307,26 +142,11 @@ struct ConsensusSession
307 struct ConsensusPeerInformation *info; 142 struct ConsensusPeerInformation *info;
308 143
309 /** 144 /**
310 * GNUNET_YES if the client has called conclude.
311 * */
312 int conclude;
313
314 /**
315 * Index of the local peer in the peers array 145 * Index of the local peer in the peers array
316 */ 146 */
317 unsigned int local_peer_idx; 147 unsigned int local_peer_idx;
318 148
319 /** 149 /**
320 * Strata estimator, computed online
321 */
322 struct StrataEstimator *se;
323
324 /**
325 * Pre-computed IBFs
326 */
327 struct InvertibleBloomFilter **ibfs;
328
329 /**
330 * Current round 150 * Current round
331 */ 151 */
332 enum ConsensusRound current_round; 152 enum ConsensusRound current_round;
@@ -337,19 +157,36 @@ struct ConsensusSession
337 */ 157 */
338 int *shuffle; 158 int *shuffle;
339 159
160 /**
161 * Current round of the exponential scheme.
162 */
340 int exp_round; 163 int exp_round;
341 164
165 /**
166 * Current sub-round of the exponential scheme.
167 */
342 int exp_subround; 168 int exp_subround;
343 169
344 /** 170 /**
345 * The partner for the current exp-round 171 * The partner for the current exp-round
346 */ 172 */
347 struct ConsensusPeerInformation* partner_outgoing; 173 struct ConsensusPeerInformation *partner_outgoing;
348 174
349 /** 175 /**
350 * The partner for the current exp-round 176 * The partner for the current exp-round
351 */ 177 */
352 struct ConsensusPeerInformation* partner_incoming; 178 struct ConsensusPeerInformation *partner_incoming;
179
180 /**
181 * The consensus set of this session.
182 */
183 struct GNUNET_SET_Handle *element_set;
184
185 /**
186 * Listener for requests from other peers.
187 * Uses the session's global id as app id.
188 */
189 struct GNUNET_SET_ListenHandle *set_listener;
353}; 190};
354 191
355 192
@@ -374,41 +211,6 @@ struct ConsensusPeerInformation
374 */ 211 */
375 int hello; 212 int hello;
376 213
377 /*
378 * FIXME
379 */
380 struct MessageStreamState mss;
381
382 /**
383 * Current state
384 */
385 enum ConsensusIBFState ibf_state;
386
387 /**
388 * What is the order (=log2 size) of the ibf
389 * we're currently dealing with?
390 * Interpretation depends on ibf_state.
391 */
392 int ibf_order;
393
394 /**
395 * The current IBF for this peer,
396 * purpose dependent on ibf_state
397 */
398 struct InvertibleBloomFilter *ibf;
399
400 /**
401 * How many buckets have we transmitted/received?
402 * Interpretatin depends on ibf_state
403 */
404 int ibf_bucket_counter;
405
406 /**
407 * Strata estimator of the peer, NULL if our peer
408 * initiated the reconciliation.
409 */
410 struct StrataEstimator *se;
411
412 /** 214 /**
413 * Back-reference to the consensus session, 215 * Back-reference to the consensus session,
414 * to that ConsensusPeerInformation can be used as a closure 216 * to that ConsensusPeerInformation can be used as a closure
@@ -416,18 +218,6 @@ struct ConsensusPeerInformation
416 struct ConsensusSession *session; 218 struct ConsensusSession *session;
417 219
418 /** 220 /**
419 * True if we are actually replaying the strata message,
420 * e.g. currently handling the premature_strata_message.
421 */
422 int replaying_strata_message;
423
424 /**
425 * A strata message that is not actually for the current round,
426 * used in the exp-scheme.
427 */
428 struct StrataMessage *premature_strata_message;
429
430 /**
431 * We have finishes the exp-subround with the peer. 221 * We have finishes the exp-subround with the peer.
432 */ 222 */
433 int exp_subround_finished; 223 int exp_subround_finished;
@@ -444,65 +234,15 @@ struct ConsensusPeerInformation
444 * older round, while we are already in the next round. 234 * older round, while we are already in the next round.
445 */ 235 */
446 enum ConsensusRound apparent_round; 236 enum ConsensusRound apparent_round;
447};
448
449
450/**
451 * Sockets from other peers who want to communicate with us.
452 * It may not be known yet which consensus session they belong to, we have to wait for the
453 * peer's hello.
454 * Also, the session might not exist yet locally, we have to wait for a local client to connect.
455 */
456struct IncomingSocket
457{
458 /**
459 * Incoming sockets are kept in a double linked list.
460 */
461 struct IncomingSocket *next;
462
463 /**
464 * Incoming sockets are kept in a double linked list.
465 */
466 struct IncomingSocket *prev;
467
468 /**
469 * Peer that connected to us with the socket.
470 */
471 struct GNUNET_PeerIdentity peer_id;
472 237
473 /** 238 /**
474 * Peer-in-session this socket belongs to, once known, otherwise NULL. 239 * Set operation we are currently executing with this peer.
475 */ 240 */
476 struct ConsensusPeerInformation *cpi; 241 struct GNUNET_SET_OperationHandle *set_op;
477
478 /**
479 * Set to the global session id, if the peer sent us a hello-message,
480 * but the session does not exist yet.
481 */
482 struct GNUNET_HashCode *requested_gid;
483
484 /*
485 * Timeout, will disconnect the socket if not yet in a session.
486 * FIXME: implement
487 */
488 GNUNET_SCHEDULER_TaskIdentifier timeout;
489
490 /* FIXME */
491 struct MessageStreamState mss;
492}; 242};
493 243
494 244
495/** 245/**
496 * Linked list of incoming sockets.
497 */
498static struct IncomingSocket *incoming_sockets_head;
499
500/**
501 * Linked list of incoming sockets.
502 */
503static struct IncomingSocket *incoming_sockets_tail;
504
505/**
506 * Linked list of sessions this peer participates in. 246 * Linked list of sessions this peer participates in.
507 */ 247 */
508static struct ConsensusSession *sessions_head; 248static struct ConsensusSession *sessions_head;
@@ -525,297 +265,10 @@ static struct GNUNET_SERVER_Handle *srv;
525/** 265/**
526 * Peer that runs this service. 266 * Peer that runs this service.
527 */ 267 */
528static struct GNUNET_PeerIdentity *my_peer; 268static struct GNUNET_PeerIdentity my_peer;
529
530/**
531 * Handle to the core service. Only used during service startup, will be NULL after that.
532 */
533static struct GNUNET_CORE_Handle *core;
534
535/**
536 * Listener for sockets from peers that want to reconcile with us.
537 */
538static struct GNUNET_STREAM_ListenSocket *listener;
539
540
541/**
542 * Transmit a queued message to the session's client.
543 *
544 * @param cls consensus session
545 * @param size number of bytes available in buf
546 * @param buf where the callee should write the message
547 * @return number of bytes written to buf
548 */
549static size_t
550transmit_queued (void *cls, size_t size,
551 void *buf)
552{
553 struct MessageQueue *mq = cls;
554 struct PendingMessage *pm = mq->pending_head;
555 struct ServerClientSocketState *state = mq->state;
556 size_t msg_size;
557
558 GNUNET_assert (NULL != pm);
559 GNUNET_assert (NULL != buf);
560 msg_size = ntohs (pm->msg->size);
561 GNUNET_assert (size >= msg_size);
562 memcpy (buf, pm->msg, msg_size);
563 GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
564 state->th = NULL;
565 client_send_next (cls);
566 GNUNET_free (pm);
567 return msg_size;
568}
569
570
571static void
572client_send_next (struct MessageQueue *mq)
573{
574 struct ServerClientSocketState *state = mq->state;
575 int msize;
576
577 GNUNET_assert (NULL != state);
578
579 if ( (NULL != state->th) ||
580 (NULL == mq->pending_head) )
581 return;
582 msize = ntohs (mq->pending_head->msg->size);
583 state->th =
584 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
585 GNUNET_TIME_UNIT_FOREVER_REL,
586 &transmit_queued, mq);
587}
588
589
590struct MessageQueue *
591create_message_queue_for_server_client (struct ServerClientSocketState *scss)
592{
593 struct MessageQueue *mq;
594 mq = GNUNET_new (struct MessageQueue);
595 mq->add_cb = client_send_next;
596 mq->state = scss;
597 return mq;
598}
599
600
601/**
602 * Functions of this signature are called whenever writing operations
603 * on a stream are executed
604 *
605 * @param cls the closure from GNUNET_STREAM_write
606 * @param status the status of the stream at the time this function is called;
607 * GNUNET_STREAM_OK if writing to stream was completed successfully;
608 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
609 * (this doesn't mean that the data is never sent, the receiver may
610 * have read the data but its ACKs may have been lost);
611 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
612 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
613 * be processed.
614 * @param size the number of bytes written
615 */
616static void
617write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
618{
619 struct MessageQueue *mq = cls;
620 struct MessageStreamState *mss = mq->state;
621 struct PendingMessage *pm;
622
623 GNUNET_assert (GNUNET_STREAM_OK == status);
624
625 /* call cb for message we finished sending */
626 pm = mq->current_pm;
627 if (NULL != pm)
628 {
629 if (NULL != pm->sent_cb)
630 pm->sent_cb (pm->sent_cb_cls);
631 GNUNET_free (pm);
632 }
633
634 mss->wh = NULL;
635
636 pm = mq->pending_head;
637 mq->current_pm = pm;
638 if (NULL == pm)
639 return;
640 GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
641 mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size),
642 GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls);
643 GNUNET_assert (NULL != mss->wh);
644}
645
646
647static void
648stream_socket_add_cb (struct MessageQueue *mq)
649{
650 if (NULL != mq->current_pm)
651 return;
652 write_queued (mq, GNUNET_STREAM_OK, 0);
653}
654
655
656struct MessageQueue *
657create_message_queue_for_stream_socket (struct MessageStreamState *mss)
658{
659 struct MessageQueue *mq;
660 mq = GNUNET_new (struct MessageQueue);
661 mq->state = mss;
662 mq->add_cb = stream_socket_add_cb;
663 return mq;
664}
665
666
667struct PendingMessage *
668new_pending_message (uint16_t size, uint16_t type)
669{
670 struct PendingMessage *pm;
671 pm = GNUNET_malloc (sizeof *pm + size);
672 pm->msg = (void *) &pm[1];
673 pm->msg->size = htons (size);
674 pm->msg->type = htons (type);
675 return pm;
676}
677
678
679/**
680 * Queue a message in a message queue.
681 *
682 * @param queue the message queue
683 * @param pending message, message with additional information
684 */
685void
686message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg)
687{
688 GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg);
689 queue->add_cb (queue);
690}
691
692
693/**
694 * Called when we receive data from a peer via stream.
695 *
696 * @param cls the closure from GNUNET_STREAM_read
697 * @param status the status of the stream at the time this function is called
698 * @param data traffic from the other side
699 * @param size the number of bytes available in data read; will be 0 on timeout
700 * @return number of bytes of processed from 'data' (any data remaining should be
701 * given to the next time the read processor is called).
702 */
703static size_t
704stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size)
705{
706 struct MessageStreamState *mss = cls;
707 int ret;
708
709 mss->rh = NULL;
710
711 if (GNUNET_STREAM_OK != status)
712 {
713 /* FIXME: handle this correctly */
714 GNUNET_break (0);
715 return 0;
716 }
717 GNUNET_assert (NULL != mss->mst);
718 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES);
719 if (GNUNET_SYSERR == ret)
720 {
721 /* FIXME: handle this correctly */
722 GNUNET_break (0);
723 return 0;
724 }
725 /* read again */
726 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss);
727 /* we always read all data */
728 return size;
729}
730
731
732/**
733 * Send element or element report to the peer specified in cpi.
734 *
735 * @param cpi peer to send the elements to
736 * @param head head of the element list
737 */
738static void
739send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e)
740{
741 struct PendingMessage *pm;
742
743 switch (cpi->apparent_round)
744 {
745 case CONSENSUS_ROUND_COMPLETION:
746 case CONSENSUS_ROUND_EXCHANGE:
747 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size,
748 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
749 memcpy (&pm->msg[1], e->element->data, e->element->size);
750 message_queue_add (cpi->mss.mq, pm);
751 break;
752 case CONSENSUS_ROUND_INVENTORY:
753 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode),
754 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
755 memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode));
756 message_queue_add (cpi->mss.mq, pm);
757 break;
758 default:
759 GNUNET_break (0);
760 }
761}
762
763
764/**
765 * Iterator to insert values into an ibf.
766 *
767 * @param cls closure
768 * @param key current key code
769 * @param value value in the hash map
770 * @return GNUNET_YES if we should continue to
771 * iterate,
772 * GNUNET_NO if not.
773 */
774static int
775ibf_values_iterator (void *cls,
776 const struct GNUNET_HashCode *key,
777 void *value)
778{
779 struct ConsensusPeerInformation *cpi = cls;
780 struct ElementInfo *e = value;
781 struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash);
782
783 GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
784 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
785 return GNUNET_YES;
786}
787
788/**
789 * Create and populate an IBF for the specified peer,
790 * if it does not already exist.
791 *
792 * @param cpi peer to create the ibf for
793 */
794static void
795prepare_ibf (struct ConsensusPeerInformation *cpi)
796{
797 if (NULL != cpi->session->ibfs[cpi->ibf_order])
798 return;
799 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
800 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
801}
802
803
804/**
805 * Called when a remote peer wants to inform the local peer
806 * that the remote peer misses elements.
807 * Elements are not reconciled.
808 *
809 * @param cpi session
810 * @param msg message
811 */
812static int
813handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
814{
815 GNUNET_assert (0);
816}
817 269
818 270
271/*
819static int 272static int
820exp_subround_finished (const struct ConsensusSession *session) 273exp_subround_finished (const struct ConsensusSession *session)
821{ 274{
@@ -831,8 +284,11 @@ exp_subround_finished (const struct ConsensusSession *session)
831 return GNUNET_YES; 284 return GNUNET_YES;
832 return GNUNET_NO; 285 return GNUNET_NO;
833} 286}
287*/
288
834 289
835 290
291/*
836static int 292static int
837inventory_round_finished (struct ConsensusSession *session) 293inventory_round_finished (struct ConsensusSession *session)
838{ 294{
@@ -846,61 +302,7 @@ inventory_round_finished (struct ConsensusSession *session)
846 return GNUNET_YES; 302 return GNUNET_YES;
847 return GNUNET_NO; 303 return GNUNET_NO;
848} 304}
849 305*/
850
851static void
852clear_message_stream_state (struct MessageStreamState *mss)
853{
854 if (NULL != mss->mst)
855 {
856 GNUNET_SERVER_mst_destroy (mss->mst);
857 mss->mst = NULL;
858 }
859 if (NULL != mss->rh)
860 {
861 GNUNET_STREAM_read_cancel (mss->rh);
862 mss->rh = NULL;
863 }
864 if (NULL != mss->wh)
865 {
866 GNUNET_STREAM_write_cancel (mss->wh);
867 mss->wh = NULL;
868 }
869 if (NULL != mss->socket)
870 {
871 GNUNET_STREAM_close (mss->socket);
872 mss->socket = NULL;
873 }
874 if (NULL != mss->mq)
875 {
876 GNUNET_free (mss->mq);
877 mss->mq = NULL;
878 }
879}
880
881
882/**
883 * Iterator over hash map entries.
884 *
885 * @param cls closure
886 * @param key current key code
887 * @param value value in the hash map
888 * @return GNUNET_YES if we should continue to
889 * iterate,
890 * GNUNET_NO if not.
891 */
892static int
893destroy_element_info_iter (void *cls,
894 const struct GNUNET_HashCode * key,
895 void *value)
896{
897 struct ElementInfo *ei = value;
898 GNUNET_free (ei->element);
899 GNUNET_free (ei->element_hash);
900 GNUNET_free (ei);
901 return GNUNET_YES;
902}
903
904 306
905/** 307/**
906 * Destroy a session, free all resources associated with it. 308 * Destroy a session, free all resources associated with it.
@@ -913,11 +315,9 @@ destroy_session (struct ConsensusSession *session)
913 int i; 315 int i;
914 316
915 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); 317 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
916 GNUNET_SERVER_client_drop (session->scss.client);
917 session->scss.client = NULL;
918 if (NULL != session->client_mq) 318 if (NULL != session->client_mq)
919 { 319 {
920 GNUNET_free (session->client_mq); 320 GNUNET_MQ_destroy (session->client_mq);
921 session->client_mq = NULL; 321 session->client_mq = NULL;
922 } 322 }
923 if (NULL != session->shuffle) 323 if (NULL != session->shuffle)
@@ -925,617 +325,21 @@ destroy_session (struct ConsensusSession *session)
925 GNUNET_free (session->shuffle); 325 GNUNET_free (session->shuffle);
926 session->shuffle = NULL; 326 session->shuffle = NULL;
927 } 327 }
928 if (NULL != session->se)
929 {
930 strata_estimator_destroy (session->se);
931 session->se = NULL;
932 }
933 if (NULL != session->info) 328 if (NULL != session->info)
934 { 329 {
935 for (i = 0; i < session->num_peers; i++) 330 for (i = 0; i < session->num_peers; i++)
936 { 331 {
937 struct ConsensusPeerInformation *cpi; 332 struct ConsensusPeerInformation *cpi;
938 cpi = &session->info[i]; 333 cpi = &session->info[i];
939 clear_message_stream_state (&cpi->mss); 334 GNUNET_free (cpi);
940 if (NULL != cpi->se)
941 {
942 strata_estimator_destroy (cpi->se);
943 cpi->se = NULL;
944 }
945 if (NULL != cpi->ibf)
946 {
947 ibf_destroy (cpi->ibf);
948 cpi->ibf = NULL;
949 }
950 } 335 }
951 GNUNET_free (session->info); 336 GNUNET_free (session->info);
952 session->info = NULL; 337 session->info = NULL;
953 } 338 }
954 if (NULL != session->ibfs)
955 {
956 for (i = 0; i <= MAX_IBF_ORDER; i++)
957 {
958 if (NULL != session->ibfs[i])
959 {
960 ibf_destroy (session->ibfs[i]);
961 session->ibfs[i] = NULL;
962 }
963 }
964 GNUNET_free (session->ibfs);
965 session->ibfs = NULL;
966 }
967 if (NULL != session->values)
968 {
969 GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL);
970 GNUNET_CONTAINER_multihashmap_destroy (session->values);
971 session->values = NULL;
972 }
973
974 if (NULL != session->ibf_key_map)
975 {
976 GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map);
977 session->ibf_key_map = NULL;
978 }
979 GNUNET_free (session); 339 GNUNET_free (session);
980} 340}
981 341
982 342
983static void
984send_client_conclude_done (struct ConsensusSession *session)
985{
986 struct PendingMessage *pm;
987
988 /* check if client is even there anymore */
989 if (NULL == session->scss.client)
990 return;
991 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader),
992 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
993 message_queue_add (session->client_mq, pm);
994}
995
996
997/**
998 * Check if a strata message is for the current round or not
999 *
1000 * @param session session we are in
1001 * @param strata_msg the strata message to check
1002 * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise
1003 */
1004static int
1005is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
1006{
1007 switch (strata_msg->round)
1008 {
1009 case CONSENSUS_ROUND_COMPLETION:
1010 case CONSENSUS_ROUND_EXCHANGE:
1011 /* here, we also have to compare subrounds */
1012 if ( (strata_msg->round != session->current_round) ||
1013 (strata_msg->exp_round != session->exp_round) ||
1014 (strata_msg->exp_subround != session->exp_subround) )
1015 return GNUNET_YES;
1016 break;
1017 default:
1018 if (session->current_round != strata_msg->round)
1019 return GNUNET_YES;
1020 break;
1021 }
1022 return GNUNET_NO;
1023}
1024
1025
1026/**
1027 * Send a strata estimator.
1028 *
1029 * @param cpi the peer
1030 */
1031static void
1032send_strata_estimator (struct ConsensusPeerInformation *cpi)
1033{
1034 struct PendingMessage *pm;
1035 struct StrataMessage *strata_msg;
1036
1037 /* FIXME: why is this correct? */
1038 cpi->apparent_round = cpi->session->current_round;
1039 cpi->ibf_state = IBF_STATE_NONE;
1040 cpi->ibf_bucket_counter = 0;
1041
1042 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round);
1043
1044 pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE),
1045 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1046 strata_msg = (struct StrataMessage *) pm->msg;
1047 strata_msg->round = cpi->session->current_round;
1048 strata_msg->exp_round = cpi->session->exp_round;
1049 strata_msg->exp_subround = cpi->session->exp_subround;
1050 strata_estimator_write (cpi->session->se, &strata_msg[1]);
1051 message_queue_add (cpi->mss.mq, pm);
1052}
1053
1054
1055/**
1056 * Send an IBF of the order specified in cpi.
1057 *
1058 * @param cpi the peer
1059 */
1060static void
1061send_ibf (struct ConsensusPeerInformation *cpi)
1062{
1063 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
1064 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1065
1066 cpi->ibf_bucket_counter = 0;
1067 while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
1068 {
1069 unsigned int num_buckets;
1070 struct PendingMessage *pm;
1071 struct DifferenceDigest *digest;
1072
1073 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1074 /* limit to maximum */
1075 if (num_buckets > BUCKETS_PER_MESSAGE)
1076 num_buckets = BUCKETS_PER_MESSAGE;
1077
1078 pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE),
1079 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1080 digest = (struct DifferenceDigest *) pm->msg;
1081 digest->order = cpi->ibf_order;
1082 digest->round = cpi->apparent_round;
1083 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]);
1084 cpi->ibf_bucket_counter += num_buckets;
1085 message_queue_add (cpi->mss.mq, pm);
1086 }
1087 cpi->ibf_bucket_counter = 0;
1088 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1089}
1090
1091
1092/**
1093 * Called when a peer sends us its strata estimator.
1094 * In response, we sent out IBF of appropriate size back.
1095 *
1096 * @param cpi session
1097 * @param strata_msg message
1098 */
1099static int
1100handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
1101{
1102 unsigned int diff;
1103
1104 if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) &&
1105 (strata_msg->round == CONSENSUS_ROUND_INVENTORY) )
1106 {
1107 /* we still have to handle this request appropriately */
1108 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
1109 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1110 }
1111 else if (is_premature_strata_message (cpi->session, strata_msg))
1112 {
1113 if (GNUNET_NO == cpi->replaying_strata_message)
1114 {
1115 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n",
1116 strata_msg->exp_round, strata_msg->exp_subround);
1117 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header);
1118 }
1119 return GNUNET_YES;
1120 }
1121
1122 if (NULL == cpi->se)
1123 cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
1124
1125 cpi->apparent_round = strata_msg->round;
1126
1127 if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
1128 {
1129 LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n");
1130 return GNUNET_NO;
1131 }
1132 strata_estimator_read (&strata_msg[1], cpi->se);
1133 GNUNET_assert (NULL != cpi->session->se);
1134 diff = strata_estimator_difference (cpi->session->se, cpi->se);
1135
1136 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
1137 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
1138
1139 switch (cpi->session->current_round)
1140 {
1141 case CONSENSUS_ROUND_EXCHANGE:
1142 case CONSENSUS_ROUND_INVENTORY:
1143 case CONSENSUS_ROUND_COMPLETION:
1144 /* send IBF of the right size */
1145 cpi->ibf_order = 0;
1146 while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) )
1147 cpi->ibf_order++;
1148 if (cpi->ibf_order > MAX_IBF_ORDER)
1149 cpi->ibf_order = MAX_IBF_ORDER;
1150 cpi->ibf_order += 1;
1151 /* create ibf if not already pre-computed */
1152 prepare_ibf (cpi);
1153 if (NULL != cpi->ibf)
1154 ibf_destroy (cpi->ibf);
1155 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1156 cpi->ibf_bucket_counter = 0;
1157 send_ibf (cpi);
1158 break;
1159 default:
1160 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
1161 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1162 break;
1163 }
1164 return GNUNET_YES;
1165}
1166
1167
1168
1169static int
1170send_elements_iterator (void *cls,
1171 const struct GNUNET_HashCode * key,
1172 void *value)
1173{
1174 struct ConsensusPeerInformation *cpi = cls;
1175 struct ElementInfo *ei;
1176 ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value);
1177 if (NULL == ei)
1178 {
1179 LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n",
1180 GNUNET_h2s((struct GNUNET_HashCode *) value));
1181 return GNUNET_YES;
1182 }
1183 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n");
1184 send_element_or_report (cpi, ei);
1185 return GNUNET_YES;
1186}
1187
1188
1189/**
1190 * Decode the current diff ibf, and send elements/requests/reports/
1191 *
1192 * @param cpi partner peer
1193 */
1194static void
1195decode (struct ConsensusPeerInformation *cpi)
1196{
1197 struct IBF_Key key;
1198 int side;
1199
1200 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1201
1202 while (1)
1203 {
1204 int res;
1205
1206 res = ibf_decode (cpi->ibf, &side, &key);
1207 if (GNUNET_SYSERR == res)
1208 {
1209 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1210 /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1211 cpi->ibf_order++;
1212 prepare_ibf (cpi);
1213 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1214 cpi->ibf_bucket_counter = 0;
1215 send_ibf (cpi);
1216 return;
1217 }
1218 if (GNUNET_NO == res)
1219 {
1220 struct PendingMessage *pm;
1221 struct ConsensusRoundMessage *rmsg;
1222 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
1223
1224 pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
1225 rmsg = (struct ConsensusRoundMessage *) pm->msg;
1226 rmsg->round = cpi->apparent_round;
1227 message_queue_add (cpi->mss.mq, pm);
1228 return;
1229 }
1230 if (-1 == side)
1231 {
1232 struct GNUNET_HashCode hashcode;
1233 /* we have the element(s), send it to the other peer */
1234 ibf_hashcode_from_key (key, &hashcode);
1235 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
1236 }
1237 else
1238 {
1239 struct PendingMessage *pm;
1240 uint16_t type;
1241
1242 switch (cpi->apparent_round)
1243 {
1244 case CONSENSUS_ROUND_COMPLETION:
1245 /* FIXME: check if we really want to request the element */
1246 case CONSENSUS_ROUND_EXCHANGE:
1247 case CONSENSUS_ROUND_INVENTORY:
1248 type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST;
1249 break;
1250 default:
1251 GNUNET_assert (0);
1252 }
1253 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key),
1254 type);
1255 *(struct IBF_Key *) &pm->msg[1] = key;
1256 message_queue_add (cpi->mss.mq, pm);
1257 }
1258 }
1259}
1260
1261
1262static int
1263handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
1264{
1265 int num_buckets;
1266
1267 /* FIXME: find out if we're still expecting the same ibf! */
1268
1269 cpi->apparent_round = cpi->session->current_round;
1270 // FIXME: check header.size >= sizeof (DD)
1271 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
1272 switch (cpi->ibf_state)
1273 {
1274 case IBF_STATE_NONE:
1275 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1276 cpi->ibf_state = IBF_STATE_RECEIVING;
1277 cpi->ibf_order = digest->order;
1278 cpi->ibf_bucket_counter = 0;
1279 if (NULL != cpi->ibf)
1280 {
1281 ibf_destroy (cpi->ibf);
1282 cpi->ibf = NULL;
1283 }
1284 break;
1285 case IBF_STATE_ANTICIPATE_DIFF:
1286 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
1287 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1288 cpi->ibf_state = IBF_STATE_RECEIVING;
1289 cpi->ibf_order = digest->order;
1290 cpi->ibf_bucket_counter = 0;
1291 if (NULL != cpi->ibf)
1292 {
1293 ibf_destroy (cpi->ibf);
1294 cpi->ibf = NULL;
1295 }
1296 break;
1297 case IBF_STATE_RECEIVING:
1298 break;
1299 default:
1300 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1301 return GNUNET_YES;
1302 }
1303
1304 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
1305 {
1306 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1307 return GNUNET_YES;
1308 }
1309
1310 if (NULL == cpi->ibf)
1311 cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
1312
1313 ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
1314 cpi->ibf_bucket_counter += num_buckets;
1315
1316 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1317 {
1318 cpi->ibf_state = IBF_STATE_DECODING;
1319 cpi->ibf_bucket_counter = 0;
1320 prepare_ibf (cpi);
1321 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
1322 decode (cpi);
1323 }
1324 return GNUNET_YES;
1325}
1326
1327
1328/**
1329 * Insert an element into the consensus set of the specified session.
1330 * The element will not be copied, and freed when destroying the session.
1331 *
1332 * @param session session for new element
1333 * @param element element to insert
1334 */
1335static void
1336insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
1337{
1338 struct GNUNET_HashCode hash;
1339 struct ElementInfo *e;
1340 struct IBF_Key ibf_key;
1341 int i;
1342
1343 e = GNUNET_new (struct ElementInfo);
1344 e->element = element;
1345 e->element_hash = GNUNET_new (struct GNUNET_HashCode);
1346 GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash);
1347 ibf_key = ibf_key_from_hashcode (e->element_hash);
1348 ibf_hashcode_from_key (ibf_key, &hash);
1349 strata_estimator_insert (session->se, &hash);
1350 GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e,
1351 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1352 GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash,
1353 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1354
1355 for (i = 0; i <= MAX_IBF_ORDER; i++)
1356 {
1357 if (NULL == session->ibfs[i])
1358 continue;
1359 ibf_insert (session->ibfs[i], ibf_key);
1360 }
1361}
1362
1363
1364/**
1365 * Handle an element that another peer sent us
1366 */
1367static int
1368handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
1369{
1370 struct GNUNET_CONSENSUS_Element *element;
1371 size_t size;
1372
1373 switch (cpi->session->current_round)
1374 {
1375 case CONSENSUS_ROUND_COMPLETION:
1376 /* FIXME: check if we really expect the element */
1377 case CONSENSUS_ROUND_EXCHANGE:
1378 break;
1379 default:
1380 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
1381 return GNUNET_YES;
1382 }
1383
1384 size = ntohs (element_msg->size) - sizeof *element_msg;
1385
1386 element = GNUNET_malloc (size + sizeof *element);
1387 element->size = size;
1388 memcpy (&element[1], &element_msg[1], size);
1389 element->data = &element[1];
1390
1391 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n");
1392
1393 insert_element (cpi->session, element);
1394
1395 return GNUNET_YES;
1396}
1397
1398
1399/**
1400 * Handle a request for elements.
1401 *
1402 * @param cpi peer that is requesting the element
1403 * @param msg the element request message
1404 */
1405static int
1406handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
1407{
1408 struct GNUNET_HashCode hashcode;
1409 struct IBF_Key *ibf_key;
1410 unsigned int num;
1411
1412 /* element requests are allowed in every round */
1413
1414 num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
1415
1416 ibf_key = (struct IBF_Key *) &msg[1];
1417 while (num--)
1418 {
1419 ibf_hashcode_from_key (*ibf_key, &hashcode);
1420 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
1421 ibf_key++;
1422 }
1423 return GNUNET_YES;
1424}
1425
1426static int
1427is_peer_connected (struct ConsensusPeerInformation *cpi)
1428{
1429 if (NULL == cpi->mss.socket)
1430 return GNUNET_NO;
1431 return GNUNET_YES;
1432}
1433
1434
1435static void
1436ensure_peer_connected (struct ConsensusPeerInformation *cpi)
1437{
1438 if (NULL != cpi->mss.socket)
1439 return;
1440 cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
1441 open_cb, cpi, GNUNET_STREAM_OPTION_END);
1442}
1443
1444
1445/**
1446 * If necessary, send a message to the peer, depending on the current
1447 * round.
1448 */
1449static void
1450embrace_peer (struct ConsensusPeerInformation *cpi)
1451{
1452 if (GNUNET_NO == is_peer_connected (cpi))
1453 {
1454 ensure_peer_connected (cpi);
1455 return;
1456 }
1457 if (GNUNET_NO == cpi->hello)
1458 return;
1459 /* FIXME: correctness of switch */
1460 switch (cpi->session->current_round)
1461 {
1462 case CONSENSUS_ROUND_EXCHANGE:
1463 case CONSENSUS_ROUND_INVENTORY:
1464 if (cpi->session->partner_outgoing != cpi)
1465 break;
1466 /* fallthrough */
1467 case CONSENSUS_ROUND_COMPLETION:
1468 send_strata_estimator (cpi);
1469 default:
1470 break;
1471 }
1472}
1473
1474
1475/**
1476 * Called when stream has finishes writing the hello message
1477 */
1478static void
1479hello_cont (void *cls)
1480{
1481 struct ConsensusPeerInformation *cpi = cls;
1482
1483 cpi->hello = GNUNET_YES;
1484 embrace_peer (cpi);
1485}
1486
1487
1488/**
1489 * Called when we established a stream connection to another peer
1490 *
1491 * @param cls cpi of the peer we just connected to
1492 * @param socket socket to use to communicate with the other side (read/write)
1493 */
1494static void
1495open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1496{
1497 struct ConsensusPeerInformation *cpi = cls;
1498 struct PendingMessage *pm;
1499 struct ConsensusHello *hello;
1500
1501 GNUNET_assert (NULL == cpi->mss.mst);
1502 GNUNET_assert (NULL == cpi->mss.mq);
1503
1504 cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
1505 cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1506 cpi->mss.mst_cls = cpi;
1507
1508 pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1509 hello = (struct ConsensusHello *) pm->msg;
1510 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1511 pm->sent_cb = hello_cont;
1512 pm->sent_cb_cls = cpi;
1513 message_queue_add (cpi->mss.mq, pm);
1514 cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1515 &stream_data_processor, &cpi->mss);
1516}
1517
1518
1519static void
1520replay_premature_message (struct ConsensusPeerInformation *cpi)
1521{
1522 if (NULL != cpi->premature_strata_message)
1523 {
1524 struct StrataMessage *sm;
1525
1526 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
1527 sm = cpi->premature_strata_message;
1528 cpi->premature_strata_message = NULL;
1529
1530 cpi->replaying_strata_message = GNUNET_YES;
1531 handle_p2p_strata (cpi, sm);
1532 cpi->replaying_strata_message = GNUNET_NO;
1533
1534 GNUNET_free (sm);
1535 }
1536}
1537
1538
1539/** 343/**
1540 * Start the inventory round, contact all peers we are supposed to contact. 344 * Start the inventory round, contact all peers we are supposed to contact.
1541 * 345 *
@@ -1548,11 +352,7 @@ start_inventory (struct ConsensusSession *session)
1548 int last; 352 int last;
1549 353
1550 for (i = 0; i < session->num_peers; i++) 354 for (i = 0; i < session->num_peers; i++)
1551 {
1552 session->info[i].ibf_bucket_counter = 0;
1553 session->info[i].ibf_state = IBF_STATE_NONE;
1554 session->info[i].is_outgoing = GNUNET_NO; 355 session->info[i].is_outgoing = GNUNET_NO;
1555 }
1556 356
1557 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; 357 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1558 i = (session->local_peer_idx + 1) % session->num_peers; 358 i = (session->local_peer_idx + 1) % session->num_peers;
@@ -1560,7 +360,7 @@ start_inventory (struct ConsensusSession *session)
1560 { 360 {
1561 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); 361 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
1562 session->info[i].is_outgoing = GNUNET_YES; 362 session->info[i].is_outgoing = GNUNET_YES;
1563 embrace_peer (&session->info[i]); 363 // embrace_peer (&session->info[i]);
1564 i = (i + 1) % session->num_peers; 364 i = (i + 1) % session->num_peers;
1565 } 365 }
1566 // tie-breaker for even number of peers 366 // tie-breaker for even number of peers
@@ -1568,49 +368,12 @@ start_inventory (struct ConsensusSession *session)
1568 { 368 {
1569 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); 369 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
1570 session->info[last].is_outgoing = GNUNET_YES; 370 session->info[last].is_outgoing = GNUNET_YES;
1571 embrace_peer (&session->info[last]); 371 // embrace_peer (&session->info[last]);
1572 }
1573
1574 for (i = 0; i < session->num_peers; i++)
1575 {
1576 if (GNUNET_NO == session->info[i].is_outgoing)
1577 replay_premature_message (&session->info[i]);
1578 } 372 }
1579} 373}
1580 374
1581 375
1582/** 376/**
1583 * Iterator over hash map entries.
1584 *
1585 * @param cls closure
1586 * @param key current key code
1587 * @param value value in the hash map
1588 * @return GNUNET_YES if we should continue to
1589 * iterate,
1590 * GNUNET_NO if not.
1591 */
1592static int
1593send_client_elements_iter (void *cls,
1594 const struct GNUNET_HashCode * key,
1595 void *value)
1596{
1597 struct ConsensusSession *session = cls;
1598 struct ElementInfo *ei = value;
1599 struct PendingMessage *pm;
1600
1601 /* is the client still there? */
1602 if (NULL == session->scss.client)
1603 return GNUNET_NO;
1604
1605 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size,
1606 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
1607 message_queue_add (session->client_mq, pm);
1608 return GNUNET_YES;
1609}
1610
1611
1612
1613/**
1614 * Start the next round. 377 * Start the next round.
1615 * This function can be invoked as a timeout task, or called manually (tc will be NULL then). 378 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1616 * 379 *
@@ -1630,7 +393,7 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1630 session = cls; 393 session = cls;
1631 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); 394 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
1632 395
1633 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) 396 if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
1634 { 397 {
1635 GNUNET_SCHEDULER_cancel (session->round_timeout_tid); 398 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1636 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; 399 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
@@ -1648,8 +411,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1648 if (session->num_peers <= 2) 411 if (session->num_peers <= 2)
1649 { 412 {
1650 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); 413 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
1651 GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); 414 //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
1652 send_client_conclude_done (session); 415 //send_client_conclude_done (session);
1653 session->current_round = CONSENSUS_ROUND_FINISH; 416 session->current_round = CONSENSUS_ROUND_FINISH;
1654 return; 417 return;
1655 } 418 }
@@ -1663,7 +426,7 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1663 break; 426 break;
1664 case CONSENSUS_ROUND_COMPLETION: 427 case CONSENSUS_ROUND_COMPLETION:
1665 session->current_round = CONSENSUS_ROUND_FINISH; 428 session->current_round = CONSENSUS_ROUND_FINISH;
1666 send_client_conclude_done (session); 429 //send_client_conclude_done (session);
1667 break; 430 break;
1668 default: 431 default:
1669 GNUNET_assert (0); 432 GNUNET_assert (0);
@@ -1671,159 +434,9 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1671} 434}
1672 435
1673 436
1674static void
1675fin_sent_cb (void *cls)
1676{
1677 struct ConsensusPeerInformation *cpi;
1678 cpi = cls;
1679 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
1680 switch (cpi->session->current_round)
1681 {
1682 case CONSENSUS_ROUND_EXCHANGE:
1683 case CONSENSUS_ROUND_COMPLETION:
1684 if (cpi->session->current_round != cpi->apparent_round)
1685 {
1686 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
1687 break;
1688 }
1689 cpi->exp_subround_finished = GNUNET_YES;
1690 /* the subround is only really over if *both* partners are done */
1691 if (GNUNET_YES == exp_subround_finished (cpi->session))
1692 subround_over (cpi->session, NULL);
1693 else
1694 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
1695 break;
1696 case CONSENSUS_ROUND_INVENTORY:
1697 cpi->inventory_synced = GNUNET_YES;
1698 if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
1699 round_over (cpi->session, NULL);
1700 /* FIXME: maybe go to next round */
1701 break;
1702 default:
1703 GNUNET_break (0);
1704 }
1705}
1706
1707
1708/** 437/**
1709 * The other peer wants us to inform that he sent us all the elements we requested. 438 * Adapt the shuffle of the session for the current round.
1710 */ 439 */
1711static int
1712handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
1713{
1714 struct ConsensusRoundMessage *round_msg;
1715 round_msg = (struct ConsensusRoundMessage *) msg;
1716 /* FIXME: only call subround_over if round is the current one! */
1717 switch (cpi->session->current_round)
1718 {
1719 case CONSENSUS_ROUND_EXCHANGE:
1720 case CONSENSUS_ROUND_COMPLETION:
1721 if (cpi->session->current_round != round_msg->round)
1722 {
1723 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1724 cpi->ibf_state = IBF_STATE_NONE;
1725 cpi->ibf_bucket_counter = 0;
1726 break;
1727 }
1728 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1729 cpi->exp_subround_finished = GNUNET_YES;
1730 if (GNUNET_YES == exp_subround_finished (cpi->session))
1731 subround_over (cpi->session, NULL);
1732 else
1733 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
1734 break;
1735 case CONSENSUS_ROUND_INVENTORY:
1736 cpi->inventory_synced = GNUNET_YES;
1737 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1738 if (inventory_round_finished (cpi->session))
1739 round_over (cpi->session, NULL);
1740 break;
1741 default:
1742 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
1743 break;
1744 }
1745 return GNUNET_YES;
1746}
1747
1748
1749/**
1750 * Gets called when the other peer wants us to inform that
1751 * it has decoded our ibf and sent us all elements / requests
1752 */
1753static int
1754handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
1755{
1756 struct PendingMessage *pm;
1757 struct ConsensusRoundMessage *fin_msg;
1758
1759 /* FIXME: why handle current round?? */
1760 switch (cpi->session->current_round)
1761 {
1762 case CONSENSUS_ROUND_INVENTORY:
1763 cpi->inventory_synced = GNUNET_YES;
1764 case CONSENSUS_ROUND_COMPLETION:
1765 case CONSENSUS_ROUND_EXCHANGE:
1766 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n");
1767 pm = new_pending_message (sizeof *fin_msg,
1768 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
1769 fin_msg = (struct ConsensusRoundMessage *) pm->msg;
1770 fin_msg->round = cpi->apparent_round;
1771 /* the subround is over once we kicked off sending the fin msg */
1772 /* FIXME: assert we are talking to the right peer! */
1773 /* FIXME: mark peer as synced */
1774 pm->sent_cb = fin_sent_cb;
1775 pm->sent_cb_cls = cpi;
1776 message_queue_add (cpi->mss.mq, pm);
1777 break;
1778 default:
1779 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
1780 break;
1781 }
1782 return GNUNET_YES;
1783}
1784
1785
1786/**
1787 * Functions with this signature are called whenever a
1788 * complete message is received by the tokenizer.
1789 *
1790 * Do not call GNUNET_SERVER_mst_destroy in callback
1791 *
1792 * @param cls closure
1793 * @param client identification of the client
1794 * @param message the actual message
1795 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1796 */
1797static int
1798mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1799{
1800 struct ConsensusPeerInformation *cpi = cls;
1801 GNUNET_assert (NULL == client);
1802 GNUNET_assert (NULL != cls);
1803 switch (ntohs (message->type))
1804 {
1805 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
1806 return handle_p2p_strata (cpi, (struct StrataMessage *) message);
1807 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
1808 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
1809 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
1810 return handle_p2p_element (cpi, message);
1811 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT:
1812 return handle_p2p_element_report (cpi, message);
1813 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
1814 return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
1815 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED:
1816 return handle_p2p_synced (cpi, message);
1817 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN:
1818 return handle_p2p_fin (cpi, message);
1819 default:
1820 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n",
1821 ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey));
1822 }
1823 return GNUNET_OK;
1824}
1825
1826
1827static void 440static void
1828shuffle (struct ConsensusSession *session) 441shuffle (struct ConsensusSession *session)
1829{ 442{
@@ -1860,6 +473,7 @@ find_partners (struct ConsensusSession *session)
1860{ 473{
1861 int mark[session->num_peers]; 474 int mark[session->num_peers];
1862 int i; 475 int i;
476
1863 memset (mark, 0, session->num_peers * sizeof (int)); 477 memset (mark, 0, session->num_peers * sizeof (int));
1864 session->partner_incoming = session->partner_outgoing = NULL; 478 session->partner_incoming = session->partner_outgoing = NULL;
1865 for (i = 0; i < session->num_peers; i++) 479 for (i = 0; i < session->num_peers; i++)
@@ -1887,6 +501,22 @@ find_partners (struct ConsensusSession *session)
1887 501
1888 502
1889/** 503/**
504 * Callback for set operation results. Called for each element
505 * in the result set.
506 *
507 * @param cls closure
508 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
509 * @param status see enum GNUNET_SET_Status
510 */
511static void set_result_cb (void *cls,
512 const struct GNUNET_SET_Element *element,
513 enum GNUNET_SET_Status status)
514{
515 /* FIXME */
516}
517
518
519/**
1890 * Do the next subround in the exp-scheme. 520 * Do the next subround in the exp-scheme.
1891 * This function can be invoked as a timeout task, or called manually (tc will be NULL then). 521 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1892 * 522 *
@@ -1905,9 +535,11 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1905 return; 535 return;
1906 session = cls; 536 session = cls;
1907 /* cancel timeout */ 537 /* cancel timeout */
1908 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) 538 if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
539 {
1909 GNUNET_SCHEDULER_cancel (session->round_timeout_tid); 540 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1910 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; 541 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
542 }
1911 /* check if we are done with the log phase, 2-peer consensus only does one log round */ 543 /* check if we are done with the log phase, 2-peer consensus only does one log round */
1912 if ( (session->exp_round == NUM_EXP_ROUNDS) || 544 if ( (session->exp_round == NUM_EXP_ROUNDS) ||
1913 ((session->num_peers == 2) && (session->exp_round == 1))) 545 ((session->num_peers == 2) && (session->exp_round == 1)))
@@ -1938,8 +570,25 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1938 session->exp_subround++; 570 session->exp_subround++;
1939 } 571 }
1940 572
573 /* determine the incoming and outgoing partner */
1941 find_partners (session); 574 find_partners (session);
1942 575
576 if (NULL != session->partner_outgoing)
577 {
578 if (NULL != session->partner_outgoing->set_op)
579 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
580 session->partner_outgoing->set_op =
581 GNUNET_SET_evaluate (session->element_set,
582 &session->partner_outgoing->peer_id,
583 &session->global_id,
584 NULL, /* FIXME */
585 0, /* FIXME */
586 GNUNET_SET_RESULT_ADDED,
587 set_result_cb, session);
588
589
590 }
591
1943#ifdef GNUNET_EXTRA_LOGGING 592#ifdef GNUNET_EXTRA_LOGGING
1944 { 593 {
1945 int in; 594 int in;
@@ -1957,29 +606,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1957 } 606 }
1958#endif /* GNUNET_EXTRA_LOGGING */ 607#endif /* GNUNET_EXTRA_LOGGING */
1959 608
1960 if (NULL != session->partner_incoming)
1961 {
1962 session->partner_incoming->ibf_state = IBF_STATE_NONE;
1963 session->partner_incoming->exp_subround_finished = GNUNET_NO;
1964 session->partner_incoming->ibf_bucket_counter = 0;
1965
1966 /* maybe there's an early strata estimator? */
1967 replay_premature_message (session->partner_incoming);
1968 }
1969
1970 if (NULL != session->partner_outgoing)
1971 {
1972 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
1973 session->partner_outgoing->ibf_bucket_counter = 0;
1974 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1975 /* make sure peer is connected and send the SE */
1976 embrace_peer (session->partner_outgoing);
1977 }
1978
1979 /*
1980 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
1981 subround_over, session);
1982 */
1983} 609}
1984 610
1985 611
@@ -2002,146 +628,6 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess
2002 628
2003 629
2004/** 630/**
2005 * Handle a HELLO-message, send when another peer wants to join a session where
2006 * our peer is a member. The session may or may not be inhabited yet.
2007 */
2008static int
2009handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
2010{
2011 struct ConsensusSession *session;
2012
2013 if (NULL != inc->requested_gid)
2014 {
2015 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n");
2016 return GNUNET_YES;
2017 }
2018 if (NULL != inc->cpi)
2019 {
2020 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n");
2021 return GNUNET_YES;
2022 }
2023
2024 for (session = sessions_head; NULL != session; session = session->next)
2025 {
2026 int idx;
2027 struct ConsensusPeerInformation *cpi;
2028 if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
2029 continue;
2030 idx = get_peer_idx (&inc->peer_id, session);
2031 GNUNET_assert (-1 != idx);
2032 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
2033 cpi = &session->info[idx];
2034 inc->cpi = cpi;
2035 cpi->mss = inc->mss;
2036 cpi = &session->info[idx];
2037 cpi->hello = GNUNET_YES;
2038 cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
2039 embrace_peer (cpi);
2040 return GNUNET_YES;
2041 }
2042 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
2043 inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode));
2044 return GNUNET_YES;
2045}
2046
2047
2048
2049/**
2050 * Handle tokenized messages from stream sockets.
2051 * Delegate them if the socket belongs to a session,
2052 * handle hello messages otherwise.
2053 *
2054 * Do not call GNUNET_SERVER_mst_destroy in callback
2055 *
2056 * @param cls closure, unused
2057 * @param client incoming socket this message comes from
2058 * @param message the actual message
2059 *
2060 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
2061 */
2062static int
2063mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
2064{
2065 struct IncomingSocket *inc;
2066 GNUNET_assert (NULL == client);
2067 GNUNET_assert (NULL != cls);
2068 inc = (struct IncomingSocket *) cls;
2069 switch (ntohs( message->type))
2070 {
2071 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
2072 return handle_p2p_hello (inc, (struct ConsensusHello *) message);
2073 default:
2074 if (NULL != inc->cpi)
2075 return mst_session_callback (inc->cpi, client, message);
2076 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n",
2077 ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey));
2078 }
2079 return GNUNET_OK;
2080}
2081
2082
2083/**
2084 * Functions of this type are called upon new stream connection from other peers
2085 * or upon binding error which happen when the app_port given in
2086 * GNUNET_STREAM_listen() is already taken.
2087 *
2088 * @param cls the closure from GNUNET_STREAM_listen
2089 * @param socket the socket representing the stream; NULL on binding error
2090 * @param initiator the identity of the peer who wants to establish a stream
2091 * with us; NULL on binding error
2092 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
2093 * stream (the socket will be invalid after the call)
2094 */
2095static int
2096listen_cb (void *cls,
2097 struct GNUNET_STREAM_Socket *socket,
2098 const struct GNUNET_PeerIdentity *initiator)
2099{
2100 struct IncomingSocket *incoming;
2101
2102 if (NULL == socket)
2103 {
2104 GNUNET_break (0);
2105 return GNUNET_SYSERR;
2106 }
2107 incoming = GNUNET_malloc (sizeof *incoming);
2108 incoming->peer_id = *initiator;
2109 incoming->mss.socket = socket;
2110 incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
2111 &stream_data_processor, &incoming->mss);
2112 incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
2113 incoming->mss.mst_cls = incoming;
2114 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
2115 return GNUNET_OK;
2116}
2117
2118
2119/**
2120 * Disconnect a client, and destroy all sessions associated with it.
2121 *
2122 * @param client the client to disconnect
2123 */
2124static void
2125disconnect_client (struct GNUNET_SERVER_Client *client)
2126{
2127 struct ConsensusSession *session;
2128 GNUNET_SERVER_client_disconnect (client);
2129
2130 /* if the client owns a session, remove it */
2131 session = sessions_head;
2132 while (NULL != session)
2133 {
2134 if (client == session->scss.client)
2135 {
2136 destroy_session (session);
2137 break;
2138 }
2139 session = session->next;
2140 }
2141}
2142
2143
2144/**
2145 * Compute a global, (hopefully) unique consensus session id, 631 * Compute a global, (hopefully) unique consensus session id,
2146 * from the local id of the consensus session, and the identities of all participants. 632 * from the local id of the consensus session, and the identities of all participants.
2147 * Thus, if the local id of two consensus sessions coincide, but are not comprised of 633 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
@@ -2188,7 +674,8 @@ hash_cmp (const void *h1, const void *h2)
2188 * add the local peer if not in the join message. 674 * add the local peer if not in the join message.
2189 */ 675 */
2190static void 676static void
2191initialize_session_peer_list (struct ConsensusSession *session) 677initialize_session_peer_list (struct ConsensusSession *session,
678 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2192{ 679{
2193 unsigned int local_peer_in_list; 680 unsigned int local_peer_in_list;
2194 uint32_t listed_peers; 681 uint32_t listed_peers;
@@ -2196,19 +683,19 @@ initialize_session_peer_list (struct ConsensusSession *session)
2196 struct GNUNET_PeerIdentity *peers; 683 struct GNUNET_PeerIdentity *peers;
2197 unsigned int i; 684 unsigned int i;
2198 685
2199 GNUNET_assert (NULL != session->join_msg); 686 GNUNET_assert (NULL != join_msg);
2200 687
2201 /* peers in the join message, may or may not include the local peer */ 688 /* peers in the join message, may or may not include the local peer */
2202 listed_peers = ntohl (session->join_msg->num_peers); 689 listed_peers = ntohl (join_msg->num_peers);
2203 690
2204 session->num_peers = listed_peers; 691 session->num_peers = listed_peers;
2205 692
2206 msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1]; 693 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2207 694
2208 local_peer_in_list = GNUNET_NO; 695 local_peer_in_list = GNUNET_NO;
2209 for (i = 0; i < listed_peers; i++) 696 for (i = 0; i < listed_peers; i++)
2210 { 697 {
2211 if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) 698 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2212 { 699 {
2213 local_peer_in_list = GNUNET_YES; 700 local_peer_in_list = GNUNET_YES;
2214 break; 701 break;
@@ -2221,7 +708,7 @@ initialize_session_peer_list (struct ConsensusSession *session)
2221 peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); 708 peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2222 709
2223 if (GNUNET_NO == local_peer_in_list) 710 if (GNUNET_NO == local_peer_in_list)
2224 peers[session->num_peers - 1] = *my_peer; 711 peers[session->num_peers - 1] = my_peer;
2225 712
2226 memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); 713 memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2227 qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); 714 qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
@@ -2236,38 +723,34 @@ initialize_session_peer_list (struct ConsensusSession *session)
2236 session->info[i].peer_id = peers[i]; 723 session->info[i].peer_id = peers[i];
2237 } 724 }
2238 725
2239 free (peers); 726 GNUNET_free (peers);
2240} 727}
2241 728
2242 729
730
731
732
2243/** 733/**
2244 * Add incoming peer connections to the session, 734 * Called when another peer wants to do a set operation with the
2245 * for peers who have connected to us before the local session has been established 735 * local peer.
2246 * 736 *
2247 * @param session ... 737 * @param other_peer the other peer
738 * @param context_msg message with application specific information from
739 * the other peer
740 * @param request request from the other peer, use GNUNET_SET_accept
741 * to accept it, otherwise the request will be refused
742 * Note that we don't use a return value here, as it is also
743 * necessary to specify the set we want to do the operation with,
744 * whith sometimes can be derived from the context message.
745 * Also necessary to specify the timeout.
2248 */ 746 */
2249static void 747static void
2250add_incoming_peers (struct ConsensusSession *session) 748set_listen_cb (void *cls,
749 const struct GNUNET_PeerIdentity *other_peer,
750 const struct GNUNET_MessageHeader *context_msg,
751 struct GNUNET_SET_Request *request)
2251{ 752{
2252 struct IncomingSocket *inc; 753 /* FIXME */
2253 int i;
2254 struct ConsensusPeerInformation *cpi;
2255
2256 for (inc = incoming_sockets_head; NULL != inc; inc = inc->next)
2257 {
2258 if ( (NULL == inc->requested_gid) ||
2259 (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) )
2260 continue;
2261 for (i = 0; i < session->num_peers; i++)
2262 {
2263 cpi = &session->info[i];
2264 cpi->peer_id = inc->peer_id;
2265 cpi->mss = inc->mss;
2266 cpi->hello = GNUNET_YES;
2267 inc->cpi = cpi;
2268 break;
2269 }
2270 }
2271} 754}
2272 755
2273 756
@@ -2277,46 +760,59 @@ add_incoming_peers (struct ConsensusSession *session)
2277 * @param session the session to initialize 760 * @param session the session to initialize
2278 */ 761 */
2279static void 762static void
2280initialize_session (struct ConsensusSession *session) 763initialize_session (struct ConsensusSession *session,
764 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2281{ 765{
2282 struct ConsensusSession *other_session; 766 struct ConsensusSession *other_session;
2283 767
2284 GNUNET_assert (NULL != session->join_msg); 768 initialize_session_peer_list (session, join_msg);
2285 initialize_session_peer_list (session);
2286 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); 769 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
2287 compute_global_id (session, &session->join_msg->session_id); 770 compute_global_id (session, &join_msg->session_id);
2288 771
2289 /* Check if some local client already owns the session. */ 772 /* check if some local client already owns the session. */
2290 other_session = sessions_head; 773 other_session = sessions_head;
2291 while (NULL != other_session) 774 while (NULL != other_session)
2292 { 775 {
2293 if ((other_session != session) && 776 if ((other_session != session) &&
2294 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) 777 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2295 { 778 {
2296 if (GNUNET_NO == other_session->conclude) 779 if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2297 { 780 {
2298 GNUNET_break (0); 781 GNUNET_break (0);
2299 destroy_session (session); 782 destroy_session (session);
2300 return; 783 return;
2301 } 784 }
2302 GNUNET_SERVER_client_drop (other_session->scss.client);
2303 other_session->scss.client = NULL;
2304 break; 785 break;
2305 } 786 }
2306 other_session = other_session->next; 787 other_session = other_session->next;
2307 } 788 }
2308 789
2309 session->local_peer_idx = get_peer_idx (my_peer, session); 790 session->local_peer_idx = get_peer_idx (&my_peer, session);
2310 GNUNET_assert (-1 != session->local_peer_idx); 791 GNUNET_assert (-1 != session->local_peer_idx);
792 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
793 &session->global_id,
794 set_listen_cb, session);
2311 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); 795 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
2312 GNUNET_free (session->join_msg);
2313 session->join_msg = NULL;
2314 add_incoming_peers (session);
2315 GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK);
2316 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); 796 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2317} 797}
2318 798
2319 799
800static struct ConsensusSession *
801get_session_by_client (struct GNUNET_SERVER_Client *client)
802{
803 struct ConsensusSession *session;
804
805 session = sessions_head;
806 while (NULL != session)
807 {
808 if (session->client == client)
809 return session;
810 session = session->next;
811 }
812 return NULL;
813}
814
815
2320/** 816/**
2321 * Called when a client wants to join a consensus session. 817 * Called when a client wants to join a consensus session.
2322 * 818 *
@@ -2331,45 +827,20 @@ client_join (void *cls,
2331{ 827{
2332 struct ConsensusSession *session; 828 struct ConsensusSession *session;
2333 829
2334 // make sure the client has not already joined a session 830 session = get_session_by_client (client);
2335 session = sessions_head; 831 if (NULL != session)
2336 while (NULL != session)
2337 { 832 {
2338 if (session->scss.client == client) 833 GNUNET_break (0);
2339 { 834 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2340 GNUNET_break (0); 835 return;
2341 disconnect_client (client);
2342 return;
2343 }
2344 session = session->next;
2345 } 836 }
2346
2347 session = GNUNET_new (struct ConsensusSession); 837 session = GNUNET_new (struct ConsensusSession);
2348 session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
2349 /* these have to be initialized here, as the client can already start to give us values */
2350 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
2351 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2352 session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2353 session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
2354 session->scss.client = client;
2355 session->client_mq = create_message_queue_for_server_client (&session->scss);
2356 GNUNET_SERVER_client_keep (client); 838 GNUNET_SERVER_client_keep (client);
2357
2358 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); 839 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2359 840 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
2360 // Initialize session later if local peer identity is not known yet.
2361 if (NULL == my_peer)
2362 {
2363 GNUNET_SERVER_disable_receive_done_warning (client);
2364 return;
2365 }
2366
2367 initialize_session (session);
2368} 841}
2369 842
2370 843
2371
2372
2373/** 844/**
2374 * Called when a client performs an insert operation. 845 * Called when a client performs an insert operation.
2375 * 846 *
@@ -2379,38 +850,48 @@ client_join (void *cls,
2379 */ 850 */
2380void 851void
2381client_insert (void *cls, 852client_insert (void *cls,
2382 struct GNUNET_SERVER_Client *client, 853 struct GNUNET_SERVER_Client *client,
2383 const struct GNUNET_MessageHeader *m) 854 const struct GNUNET_MessageHeader *m)
2384{ 855{
2385 struct ConsensusSession *session; 856 struct ConsensusSession *session;
2386 struct GNUNET_CONSENSUS_ElementMessage *msg; 857 struct GNUNET_CONSENSUS_ElementMessage *msg;
2387 struct GNUNET_CONSENSUS_Element *element; 858 struct GNUNET_SET_Element *element;
2388 int element_size; 859 ssize_t element_size;
2389 860
2390 session = sessions_head; 861 session = sessions_head;
2391 while (NULL != session) 862 while (NULL != session)
2392 { 863 {
2393 if (session->scss.client == client) 864 if (session->client == client)
2394 break; 865 break;
2395 } 866 }
2396 867
2397 if (NULL == session) 868 if (NULL == session)
2398 { 869 {
2399 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); 870 GNUNET_break (0);
871 GNUNET_SERVER_client_disconnect (client);
872 return;
873 }
874
875 if (CONSENSUS_ROUND_BEGIN != session->current_round)
876 {
877 GNUNET_break (0);
2400 GNUNET_SERVER_client_disconnect (client); 878 GNUNET_SERVER_client_disconnect (client);
2401 return; 879 return;
2402 } 880 }
2403 881
2404 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; 882 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2405 element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); 883 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2406 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); 884 if (element_size < 0)
885 {
886 GNUNET_break (0);
887 return;
888 }
889 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
2407 element->type = msg->element_type; 890 element->type = msg->element_type;
2408 element->size = element_size; 891 element->size = element_size;
2409 memcpy (&element[1], &msg[1], element_size); 892 memcpy (&element[1], &msg[1], element_size);
2410 element->data = &element[1]; 893 element->data = &element[1];
2411 GNUNET_assert (NULL != element->data); 894 GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
2412 insert_element (session, element);
2413
2414 GNUNET_SERVER_receive_done (client, GNUNET_OK); 895 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2415} 896}
2416 897
@@ -2432,9 +913,8 @@ client_conclude (void *cls,
2432 913
2433 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; 914 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
2434 915
2435 session = sessions_head; 916 session = get_session_by_client (client);
2436 while ((session != NULL) && (session->scss.client != client)) 917
2437 session = session->next;
2438 if (NULL == session) 918 if (NULL == session)
2439 { 919 {
2440 /* client not found */ 920 /* client not found */
@@ -2447,16 +927,12 @@ client_conclude (void *cls,
2447 { 927 {
2448 /* client requested conclude twice */ 928 /* client requested conclude twice */
2449 GNUNET_break (0); 929 GNUNET_break (0);
2450 /* client may still own a session, destroy it */
2451 disconnect_client (client);
2452 return; 930 return;
2453 } 931 }
2454 932
2455 session->conclude = GNUNET_YES;
2456
2457 if (session->num_peers <= 1) 933 if (session->num_peers <= 1)
2458 { 934 {
2459 send_client_conclude_done (session); 935 //send_client_conclude_done (session);
2460 } 936 }
2461 else 937 else
2462 { 938 {
@@ -2465,48 +941,12 @@ client_conclude (void *cls,
2465 round_over (session, NULL); 941 round_over (session, NULL);
2466 } 942 }
2467 943
944 GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
2468 GNUNET_SERVER_receive_done (client, GNUNET_OK); 945 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2469} 946}
2470 947
2471 948
2472/** 949/**
2473 * Task that disconnects from core.
2474 *
2475 * @param cls core handle
2476 * @param tc context information (why was this task triggered now)
2477 */
2478static void
2479disconnect_core (void *cls,
2480 const struct GNUNET_SCHEDULER_TaskContext *tc)
2481{
2482 if (core != NULL)
2483 {
2484 GNUNET_CORE_disconnect (core);
2485 core = NULL;
2486 }
2487 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
2488}
2489
2490
2491static void
2492core_startup (void *cls,
2493 struct GNUNET_CORE_Handle *core,
2494 const struct GNUNET_PeerIdentity *peer)
2495{
2496 struct ConsensusSession *session;
2497
2498 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
2499 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
2500 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
2501 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
2502 /* initialize sessions that are waiting for the local peer identity */
2503 for (session = sessions_head; NULL != session; session = session->next)
2504 if (NULL != session->join_msg)
2505 initialize_session (session);
2506}
2507
2508
2509/**
2510 * Called to clean up, after a shutdown has been requested. 950 * Called to clean up, after a shutdown has been requested.
2511 * 951 *
2512 * @param cls closure 952 * @param cls closure
@@ -2516,35 +956,8 @@ static void
2516shutdown_task (void *cls, 956shutdown_task (void *cls,
2517 const struct GNUNET_SCHEDULER_TaskContext *tc) 957 const struct GNUNET_SCHEDULER_TaskContext *tc)
2518{ 958{
2519 while (NULL != incoming_sockets_head)
2520 {
2521 struct IncomingSocket *socket;
2522 socket = incoming_sockets_head;
2523 if (NULL == socket->cpi)
2524 clear_message_stream_state (&socket->mss);
2525 incoming_sockets_head = incoming_sockets_head->next;
2526 GNUNET_free (socket);
2527 }
2528
2529 while (NULL != sessions_head) 959 while (NULL != sessions_head)
2530 {
2531 struct ConsensusSession *session;
2532 session = sessions_head->next;
2533 destroy_session (sessions_head); 960 destroy_session (sessions_head);
2534 sessions_head = session;
2535 }
2536
2537 if (NULL != core)
2538 {
2539 GNUNET_CORE_disconnect (core);
2540 core = NULL;
2541 }
2542
2543 if (NULL != listener)
2544 {
2545 GNUNET_STREAM_listen_close (listener);
2546 listener = NULL;
2547 }
2548 961
2549 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); 962 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
2550} 963}
@@ -2560,10 +973,6 @@ shutdown_task (void *cls,
2560static void 973static void
2561run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) 974run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
2562{ 975{
2563 /* core is only used to retrieve the peer identity */
2564 static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
2565 {NULL, 0, 0}
2566 };
2567 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 976 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2568 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, 977 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
2569 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, 978 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
@@ -2574,21 +983,15 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
2574 983
2575 cfg = c; 984 cfg = c;
2576 srv = server; 985 srv = server;
2577 986 if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &my_peer))
987 {
988 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
989 GNUNET_break (0);
990 GNUNET_SCHEDULER_shutdown ();
991 return;
992 }
2578 GNUNET_SERVER_add_handlers (server, server_handlers); 993 GNUNET_SERVER_add_handlers (server, server_handlers);
2579
2580 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); 994 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
2581
2582 listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
2583 &listen_cb, NULL,
2584 GNUNET_STREAM_OPTION_END);
2585
2586 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
2587 core = GNUNET_CORE_connect (c, NULL,
2588 &core_startup, NULL,
2589 NULL, NULL, GNUNET_NO, NULL,
2590 GNUNET_NO, core_handlers);
2591 GNUNET_assert (NULL != core);
2592 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); 995 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
2593} 996}
2594 997