aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/gnunet-service-consensus.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-03-07 14:12:11 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-03-07 14:12:11 +0000
commit159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0 (patch)
treee4e60aec526c9a0ffbe3dd69896d394587e8586a /src/consensus/gnunet-service-consensus.c
parent1a17d075effa5fbc3b3521ab0d15b2d035599969 (diff)
downloadgnunet-159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0.tar.gz
gnunet-159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0.zip
consensus now implemented with primitive conclusion group selection
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r--src/consensus/gnunet-service-consensus.c855
1 files changed, 626 insertions, 229 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index d223360dc..2f59b86bc 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -20,7 +20,7 @@
20 20
21/** 21/**
22 * @file consensus/gnunet-service-consensus.c 22 * @file consensus/gnunet-service-consensus.c
23 * @brief 23 * @brief multi-peer set reconciliation
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
26 26
@@ -60,7 +60,7 @@
60 * Choose this value so that computing the IBF is still cheaper 60 * Choose this value so that computing the IBF is still cheaper
61 * than transmitting all values. 61 * than transmitting all values.
62 */ 62 */
63#define MAX_IBF_ORDER (32) 63#define MAX_IBF_ORDER (16)
64 64
65 65
66/* forward declarations */ 66/* forward declarations */
@@ -114,11 +114,20 @@ struct PendingElement
114 */ 114 */
115struct ConsensusPeerInformation 115struct ConsensusPeerInformation
116{ 116{
117 /**
118 * Socket for communicating with the peer, either created by the local peer,
119 * or the remote peer.
120 */
117 struct GNUNET_STREAM_Socket *socket; 121 struct GNUNET_STREAM_Socket *socket;
118 122
119 /** 123 /**
124 * Message tokenizer, for the data received from this peer via the stream socket.
125 */
126 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
127
128 /**
120 * Is socket's connection established, i.e. can we write to it? 129 * Is socket's connection established, i.e. can we write to it?
121 * Only relevent on outgoing cpi. 130 * Only relevent to outgoing cpi.
122 */ 131 */
123 int is_connected; 132 int is_connected;
124 133
@@ -150,15 +159,22 @@ struct ConsensusPeerInformation
150 struct GNUNET_STREAM_WriteHandle *wh; 159 struct GNUNET_STREAM_WriteHandle *wh;
151 160
152 enum { 161 enum {
153 IBF_STATE_NONE, 162 /* beginning of round */
163 IBF_STATE_NONE=0,
164 /* we currently receive an ibf */
154 IBF_STATE_RECEIVING, 165 IBF_STATE_RECEIVING,
166 /* we currently transmit an ibf */
155 IBF_STATE_TRANSMITTING, 167 IBF_STATE_TRANSMITTING,
156 IBF_STATE_DECODING 168 /* we decode a received ibf */
169 IBF_STATE_DECODING,
170 /* wait for elements and element requests */
171 IBF_STATE_ANTICIPATE_DIFF
157 } ibf_state ; 172 } ibf_state ;
158 173
159 /** 174 /**
160 * What is the order (=log2 size) of the ibf 175 * What is the order (=log2 size) of the ibf
161 * we're currently dealing with? 176 * we're currently dealing with?
177 * Interpretation depends on ibf_state.
162 */ 178 */
163 int ibf_order; 179 int ibf_order;
164 180
@@ -169,7 +185,8 @@ struct ConsensusPeerInformation
169 struct InvertibleBloomFilter *ibf; 185 struct InvertibleBloomFilter *ibf;
170 186
171 /** 187 /**
172 * How many buckets have we transmitted/received (depending on state)? 188 * How many buckets have we transmitted/received?
189 * Interpretatin depends on ibf_state
173 */ 190 */
174 int ibf_bucket_counter; 191 int ibf_bucket_counter;
175 192
@@ -180,11 +197,25 @@ struct ConsensusPeerInformation
180 struct InvertibleBloomFilter **strata; 197 struct InvertibleBloomFilter **strata;
181 198
182 /** 199 /**
183 * difference estimated with the current strata estimator 200 * Elements that the peer is missing from us.
184 */ 201 */
185 unsigned int diff; 202 uint64_t *missing_local;
186 203
187 struct GNUNET_SERVER_MessageStreamTokenizer *mst; 204 /**
205 * Number of elements in missing_local
206 */
207 unsigned int num_missing_local;
208
209 /**
210 * Elements that this peer told us *we* don't have,
211 * i.e. we are the remote peer that has some values missing.
212 */
213 uint64_t *missing_remote;
214
215 /**
216 * Number of elements in missing_local
217 */
218 unsigned int num_missing_remote;
188 219
189 /** 220 /**
190 * Back-reference to the consensus session, 221 * Back-reference to the consensus session,
@@ -192,10 +223,17 @@ struct ConsensusPeerInformation
192 */ 223 */
193 struct ConsensusSession *session; 224 struct ConsensusSession *session;
194 225
195 struct PendingElement *send_pending_head; 226 /**
196 struct PendingElement *send_pending_tail; 227 * When decoding the IBF, requests for elements and outgoing elements
228 * have to be queued, to ensure that messages actually fit in the stream buffer.
229 */
230 struct QueuedMessage *requests_and_elements_head;
231 struct QueuedMessage *requests_and_elements_tail;
197}; 232};
198 233
234/**
235 * A doubly linked list of messages.
236 */
199struct QueuedMessage 237struct QueuedMessage
200{ 238{
201 struct GNUNET_MessageHeader *msg; 239 struct GNUNET_MessageHeader *msg;
@@ -211,31 +249,40 @@ struct QueuedMessage
211 struct QueuedMessage *prev; 249 struct QueuedMessage *prev;
212}; 250};
213 251
252/**
253 * Describes the current round a consensus session is in.
254 */
214enum ConsensusRound 255enum ConsensusRound
215{ 256{
216 /** 257 /**
217 * distribution of information with the exponential scheme 258 * Not started the protocl yet
259 */
260 CONSENSUS_ROUND_BEGIN=0,
261 /**
262 * distribution of information with the exponential scheme.
218 */ 263 */
219 CONSENSUS_ROUND_EXP_EXCHANGE, 264 CONSENSUS_ROUND_EXP_EXCHANGE,
220 /** 265 /**
221 * All-to-all, exchange missing values 266 * All-to-all, exchange missing values.
222 */ 267 */
223 CONSENSUS_ROUND_A2A_EXCHANGE, 268 CONSENSUS_ROUND_A2A_EXCHANGE,
224 /** 269 /**
225 * All-to-all, check what values are missing, don't exchange anything 270 * All-to-all, check what values are missing, don't exchange anything.
226 */ 271 */
227 CONSENSUS_ROUND_A2A_INVENTORY 272 CONSENSUS_ROUND_A2A_INVENTORY,
228 273 /**
229 /* 274 * All-to-all round to exchange information for byzantine fault detection.
230 a round to exchange the information for fraud-detection 275 */
231 CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT 276 CONSENSUS_ROUND_A2A_INVENTORY_AGREEMENT,
232 */ 277 /**
278 * Rounds are over
279 */
280 CONSENSUS_ROUND_FINISH
233}; 281};
234 282
235 283
236/** 284/**
237 * A consensus session consists of one local client and the remote authorities. 285 * A consensus session consists of one local client and the remote authorities.
238 *
239 */ 286 */
240struct ConsensusSession 287struct ConsensusSession
241{ 288{
@@ -301,15 +348,15 @@ struct ConsensusSession
301 struct GNUNET_SERVER_TransmitHandle *th; 348 struct GNUNET_SERVER_TransmitHandle *th;
302 349
303 /** 350 /**
304 * Once conclude_requested is GNUNET_YES, the client may not 351 * Timeout for all rounds together, single rounds will schedule a timeout task
305 * insert any more values. 352 * with a fraction of the conclude timeout.
306 */ 353 */
307 int conclude_requested; 354 struct GNUNET_TIME_Relative conclude_timeout;
308 355
309 /** 356 /**
310 * Minimum number of peers to form a consensus group 357 * Timeout task identifier for the current round
311 */ 358 */
312 int conclude_group_min; 359 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
313 360
314 /** 361 /**
315 * Number of other peers in the consensus 362 * Number of other peers in the consensus
@@ -353,6 +400,7 @@ struct ConsensusSession
353/** 400/**
354 * Sockets from other peers who want to communicate with us. 401 * Sockets from other peers who want to communicate with us.
355 * It may not be known yet which consensus session they belong to. 402 * It may not be known yet which consensus session they belong to.
403 * Also, the session might not exist yet locally.
356 */ 404 */
357struct IncomingSocket 405struct IncomingSocket
358{ 406{
@@ -400,6 +448,7 @@ struct IncomingSocket
400 struct GNUNET_HashCode *requested_gid; 448 struct GNUNET_HashCode *requested_gid;
401}; 449};
402 450
451
403static struct IncomingSocket *incoming_sockets_head; 452static struct IncomingSocket *incoming_sockets_head;
404static struct IncomingSocket *incoming_sockets_tail; 453static struct IncomingSocket *incoming_sockets_tail;
405 454
@@ -440,7 +489,7 @@ static struct GNUNET_STREAM_ListenSocket *listener;
440 489
441 490
442/** 491/**
443 * Queue a message to be sent to the inhabiting client of a sessino 492 * Queue a message to be sent to the inhabiting client of a session.
444 * 493 *
445 * @param session session 494 * @param session session
446 * @param msg message we want to queue 495 * @param msg message we want to queue
@@ -465,7 +514,9 @@ get_cpi_index (struct ConsensusPeerInformation *cpi)
465} 514}
466 515
467/** 516/**
468 * Mark the peer as bad, free as state we don't need anymore. 517 * Mark the peer as bad, free state we don't need anymore.
518 *
519 * @param cpi consensus peer information of the bad peer
469 */ 520 */
470static void 521static void
471mark_peer_bad (struct ConsensusPeerInformation *cpi) 522mark_peer_bad (struct ConsensusPeerInformation *cpi)
@@ -479,6 +530,11 @@ mark_peer_bad (struct ConsensusPeerInformation *cpi)
479/** 530/**
480 * Estimate set difference with two strata estimators, 531 * Estimate set difference with two strata estimators,
481 * i.e. arrays of IBFs. 532 * i.e. arrays of IBFs.
533 * Does not not modify its arguments.
534 *
535 * @param strata1 first strata estimator
536 * @param strata2 second strata estimator
537 * @return the estimated difference
482 */ 538 */
483static int 539static int
484estimate_difference (struct InvertibleBloomFilter** strata1, 540estimate_difference (struct InvertibleBloomFilter** strata1,
@@ -490,9 +546,11 @@ estimate_difference (struct InvertibleBloomFilter** strata1,
490 for (i = STRATA_COUNT - 1; i >= 0; i--) 546 for (i = STRATA_COUNT - 1; i >= 0; i--)
491 { 547 {
492 struct InvertibleBloomFilter *diff; 548 struct InvertibleBloomFilter *diff;
549 /* number of keys decoded from the ibf */
493 int ibf_count; 550 int ibf_count;
494 int more; 551 int more;
495 ibf_count = 0; 552 ibf_count = 0;
553 /* FIXME: implement this without always allocating new IBFs */
496 diff = ibf_dup (strata1[i]); 554 diff = ibf_dup (strata1[i]);
497 ibf_subtract (diff, strata2[i]); 555 ibf_subtract (diff, strata2[i]);
498 for (;;) 556 for (;;)
@@ -537,22 +595,17 @@ session_stream_data_processor (void *cls,
537 int ret; 595 int ret;
538 596
539 GNUNET_assert (GNUNET_STREAM_OK == status); 597 GNUNET_assert (GNUNET_STREAM_OK == status);
540
541 cpi = cls; 598 cpi = cls;
542
543 GNUNET_assert (NULL != cpi->mst); 599 GNUNET_assert (NULL != cpi->mst);
544
545 ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); 600 ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
546 if (GNUNET_SYSERR == ret) 601 if (GNUNET_SYSERR == ret)
547 { 602 {
548 /* FIXME: handle this correctly */ 603 /* FIXME: handle this correctly */
549 GNUNET_assert (0); 604 GNUNET_assert (0);
550 } 605 }
551
552 /* read again */ 606 /* read again */
553 cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, 607 cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
554 &session_stream_data_processor, cpi); 608 &session_stream_data_processor, cpi);
555
556 /* we always read all data */ 609 /* we always read all data */
557 return size; 610 return size;
558} 611}
@@ -578,26 +631,62 @@ incoming_stream_data_processor (void *cls,
578 int ret; 631 int ret;
579 632
580 GNUNET_assert (GNUNET_STREAM_OK == status); 633 GNUNET_assert (GNUNET_STREAM_OK == status);
581
582 incoming = cls; 634 incoming = cls;
583
584 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); 635 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
585 if (GNUNET_SYSERR == ret) 636 if (GNUNET_SYSERR == ret)
586 { 637 {
587 /* FIXME: handle this correctly */ 638 /* FIXME: handle this correctly */
588 GNUNET_assert (0); 639 GNUNET_assert (0);
589 } 640 }
590
591 /* read again */ 641 /* read again */
592 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, 642 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
593 &incoming_stream_data_processor, incoming); 643 &incoming_stream_data_processor, incoming);
594
595 /* we always read all data */ 644 /* we always read all data */
596 return size; 645 return size;
597} 646}
598 647
599 648
600/** 649/**
650 * Iterator over hash map entries.
651 * Queue elements to be sent to the peer in cls.
652 *
653 * @param cls closure
654 * @param key current key code
655 * @param value value in the hash map
656 * @return GNUNET_YES if we should continue to
657 * iterate,
658 * GNUNET_NO if not.
659 */
660static int
661send_element_iter (void *cls,
662 const struct GNUNET_HashCode *key,
663 void *value)
664{
665 struct ConsensusPeerInformation *cpi;
666 struct GNUNET_CONSENSUS_Element *element;
667 struct QueuedMessage *qm;
668 struct GNUNET_MessageHeader *element_msg;
669 size_t msize;
670 cpi = cls;
671 element = value;
672 msize = sizeof (struct GNUNET_MessageHeader) + element->size;
673 element_msg = GNUNET_malloc (msize);
674 element_msg->size = htons (msize);
675 if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round)
676 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
677 else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)
678 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE);
679 else
680 GNUNET_assert (0);
681 GNUNET_assert (NULL != element->data);
682 memcpy (&element_msg[1], element->data, element->size);
683 qm = GNUNET_malloc (sizeof *qm);
684 qm->msg = element_msg;
685 GNUNET_CONTAINER_DLL_insert (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
686 return GNUNET_YES;
687}
688
689/**
601 * Iterator to insert values into an ibf. 690 * Iterator to insert values into an ibf.
602 * 691 *
603 * @param cls closure 692 * @param cls closure
@@ -618,6 +707,12 @@ ibf_values_iterator (void *cls,
618 return GNUNET_YES; 707 return GNUNET_YES;
619} 708}
620 709
710/**
711 * Create and populate an IBF for the specified peer,
712 * if it does not already exist.
713 *
714 * @param peer to create the ibf for
715 */
621static void 716static void
622prepare_ibf (struct ConsensusPeerInformation *cpi) 717prepare_ibf (struct ConsensusPeerInformation *cpi)
623{ 718{
@@ -630,6 +725,42 @@ prepare_ibf (struct ConsensusPeerInformation *cpi)
630 725
631 726
632/** 727/**
728 * Called when a remote peer wants to inform the local peer
729 * that the remote peer misses elements.
730 * Elements are not reconciled.
731 *
732 * @param cpi session
733 * @param msg message
734 */
735static int
736handle_p2p_missing_local (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
737{
738 uint64_t key;
739 key = *(uint64_t *) &msg[1];
740 GNUNET_array_append (cpi->missing_remote, cpi->num_missing_remote, key);
741 return GNUNET_OK;
742}
743
744
745/**
746 * Called when a remote peer wants to inform the local peer
747 * that the local peer misses elements.
748 * Elements are not reconciled.
749 *
750 * @param cpi session
751 * @param msg message
752 */
753static int
754handle_p2p_missing_remote (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
755{
756 uint64_t key;
757 key = *(uint64_t *) &msg[1];
758 GNUNET_array_append (cpi->missing_local, cpi->num_missing_local, key);
759 return GNUNET_OK;
760}
761
762
763/**
633 * Called when a peer sends us its strata estimator. 764 * Called when a peer sends us its strata estimator.
634 * In response, we sent out IBF of appropriate size back. 765 * In response, we sent out IBF of appropriate size back.
635 * 766 *
@@ -640,9 +771,9 @@ static int
640handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) 771handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
641{ 772{
642 int i; 773 int i;
643 uint64_t *key_src; 774 unsigned int diff;
644 uint32_t *hash_src; 775 void *buf;
645 uint8_t *count_src; 776 size_t size;
646 777
647 GNUNET_assert (GNUNET_NO == cpi->is_outgoing); 778 GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
648 779
@@ -653,47 +784,36 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
653 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); 784 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
654 } 785 }
655 786
656 /* for correct message alignment, copy bucket types seperately */ 787 size = ntohs (strata_msg->header.size);
657 key_src = (uint64_t *) &strata_msg[1]; 788 buf = (void *) &strata_msg[1];
658
659 for (i = 0; i < STRATA_COUNT; i++) 789 for (i = 0; i < STRATA_COUNT; i++)
660 { 790 {
661 memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src); 791 int res;
662 key_src += STRATA_IBF_BUCKETS; 792 res = ibf_read (&buf, &size, cpi->strata[i]);
663 } 793 GNUNET_assert (GNUNET_OK == res);
664
665 hash_src = (uint32_t *) key_src;
666
667 for (i = 0; i < STRATA_COUNT; i++)
668 {
669 memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
670 hash_src += STRATA_IBF_BUCKETS;
671 } 794 }
672 795
673 count_src = (uint8_t *) hash_src; 796 diff = estimate_difference (cpi->session->strata, cpi->strata);
797 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff);
674 798
675 for (i = 0; i < STRATA_COUNT; i++) 799 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) ||
800 (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round))
676 { 801 {
677 memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS); 802 /* send IBF of the right size */
678 count_src += STRATA_IBF_BUCKETS; 803 cpi->ibf_order = 0;
804 while ((1 << cpi->ibf_order) < diff)
805 cpi->ibf_order++;
806 if (cpi->ibf_order > MAX_IBF_ORDER)
807 cpi->ibf_order = MAX_IBF_ORDER;
808 cpi->ibf_order += 1;
809 /* create ibf if not already pre-computed */
810 prepare_ibf (cpi);
811 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
812 cpi->ibf_state = IBF_STATE_TRANSMITTING;
813 cpi->ibf_bucket_counter = 0;
814 write_ibf (cpi, GNUNET_STREAM_OK, 0);
679 } 815 }
680 816
681 cpi->diff = estimate_difference (cpi->session->strata, cpi->strata);
682 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff);
683
684 /* send IBF of the right size */
685 cpi->ibf_order = 0;
686 while ((1 << cpi->ibf_order) < cpi->diff)
687 cpi->ibf_order++;
688 if (cpi->ibf_order > MAX_IBF_ORDER)
689 cpi->ibf_order = MAX_IBF_ORDER;
690 cpi->ibf_order += 2;
691 /* create ibf if not already pre-computed */
692 prepare_ibf (cpi);
693 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
694 cpi->ibf_state = IBF_STATE_TRANSMITTING;
695 write_ibf (cpi, GNUNET_STREAM_OK, 0);
696
697 return GNUNET_YES; 817 return GNUNET_YES;
698} 818}
699 819
@@ -702,52 +822,62 @@ static int
702handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) 822handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
703{ 823{
704 int num_buckets; 824 int num_buckets;
705 uint64_t *key_src; 825 void *buf;
706 uint32_t *hash_src;
707 uint8_t *count_src;
708 826
709 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; 827 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
710 828 switch (cpi->ibf_state)
711 if (IBF_STATE_NONE == cpi->ibf_state)
712 { 829 {
713 cpi->ibf_state = IBF_STATE_RECEIVING; 830 case IBF_STATE_NONE:
714 cpi->ibf_order = digest->order; 831 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving first ibf of order %d\n", digest->order);
715 cpi->ibf_bucket_counter = 0; 832 cpi->ibf_state = IBF_STATE_RECEIVING;
833 cpi->ibf_order = digest->order;
834 cpi->ibf_bucket_counter = 0;
835 if (NULL != cpi->ibf)
836 {
837 GNUNET_free (cpi->ibf);
838 cpi->ibf = NULL;
839 }
840 break;
841 case IBF_STATE_ANTICIPATE_DIFF:
842 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving decode fail ibf of order %d\n", digest->order);
843 cpi->ibf_state = IBF_STATE_RECEIVING;
844 cpi->ibf_order = digest->order;
845 cpi->ibf_bucket_counter = 0;
846 if (NULL != cpi->ibf)
847 {
848 ibf_destroy (cpi->ibf);
849 cpi->ibf = NULL;
850 }
851 break;
852 case IBF_STATE_RECEIVING:
853 break;
854 default:
855 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received ibf unexpectedly in state %d\n", cpi->ibf_state);
856 mark_peer_bad (cpi);
857 return GNUNET_NO;
716 } 858 }
717 859
718 if ( (IBF_STATE_RECEIVING != cpi->ibf_state) || 860 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
719 (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) )
720 { 861 {
862 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n");
721 mark_peer_bad (cpi); 863 mark_peer_bad (cpi);
722 return GNUNET_NO; 864 return GNUNET_NO;
723 } 865 }
724 866
725 867 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets,
726 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order)); 868 cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
727 869
728 if (NULL == cpi->ibf) 870 if (NULL == cpi->ibf)
729 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); 871 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
730 872
731 key_src = (uint64_t *) &digest[1]; 873 buf = (void *) &digest[1];
732 874 ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
733 memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src);
734 hash_src += num_buckets;
735
736 hash_src = (uint32_t *) key_src;
737
738 memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src);
739 hash_src += num_buckets;
740
741 count_src = (uint8_t *) hash_src;
742
743 memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src);
744 875
745 cpi->ibf_bucket_counter += num_buckets; 876 cpi->ibf_bucket_counter += num_buckets;
746 877
747 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) 878 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
748 { 879 {
749 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); 880 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
750 GNUNET_assert (NULL != cpi->wh);
751 cpi->ibf_state = IBF_STATE_DECODING; 881 cpi->ibf_state = IBF_STATE_DECODING;
752 prepare_ibf (cpi); 882 prepare_ibf (cpi);
753 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); 883 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
@@ -794,15 +924,62 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
794 924
795 925
796/** 926/**
927 * Functions of this signature are called whenever writing operations
928 * on a stream are executed
929 *
930 * @param cls the closure from GNUNET_STREAM_write
931 * @param status the status of the stream at the time this function is called;
932 * GNUNET_STREAM_OK if writing to stream was completed successfully;
933 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
934 * (this doesn't mean that the data is never sent, the receiver may
935 * have read the data but its ACKs may have been lost);
936 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
937 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
938 * be processed.
939 * @param size the number of bytes written
940 */
941static void
942write_requested_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
943{
944 struct ConsensusPeerInformation *cpi;
945 cpi = cls;
946 GNUNET_assert (NULL == cpi->wh);
947 cpi->wh = NULL;
948 if (NULL != cpi->requests_and_elements_head)
949 {
950 struct QueuedMessage *qm;
951 qm = cpi->requests_and_elements_head;
952 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
953
954 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
955 GNUNET_TIME_UNIT_FOREVER_REL,
956 write_requested_elements, cpi);
957 GNUNET_assert (NULL != cpi->wh);
958 }
959}
960
961
962/**
797 * Handle a request for elements. 963 * Handle a request for elements.
798 * Only allowed in exchange-rounds. 964 * Only allowed in exchange-rounds.
799 *
800 * FIXME: implement
801 */ 965 */
802static int 966static int
803handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) 967handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
804{ 968{
805 /* FIXME: implement */ 969 struct GNUNET_HashCode *hashcode;
970 unsigned int num;
971
972 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request\n");
973 num = ntohs (msg->header.size) / sizeof (struct GNUNET_HashCode);
974 hashcode = (struct GNUNET_HashCode *) &msg[1];
975 while (num--)
976 {
977 GNUNET_assert (IBF_STATE_ANTICIPATE_DIFF == cpi->ibf_state);
978 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, hashcode, send_element_iter, cpi);
979 if (NULL == cpi->wh)
980 write_requested_elements (cpi, GNUNET_STREAM_OK, 0);
981 hashcode++;
982 }
806 return GNUNET_YES; 983 return GNUNET_YES;
807} 984}
808 985
@@ -831,6 +1008,12 @@ handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello
831 inc->cpi->mst = inc->mst; 1008 inc->cpi->mst = inc->mst;
832 inc->cpi->hello = GNUNET_YES; 1009 inc->cpi->hello = GNUNET_YES;
833 inc->cpi->socket = inc->socket; 1010 inc->cpi->socket = inc->socket;
1011
1012 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == session->current_round) &&
1013 (GNUNET_YES == inc->cpi->is_outgoing))
1014 {
1015 write_strata (&session->info[idx], GNUNET_STREAM_OK, 0);
1016 }
834 return GNUNET_YES; 1017 return GNUNET_YES;
835 } 1018 }
836 session = session->next; 1019 session = session->next;
@@ -866,6 +1049,10 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader
866 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); 1049 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
867 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: 1050 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
868 return handle_p2p_element (cpi, message); 1051 return handle_p2p_element (cpi, message);
1052 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL:
1053 return handle_p2p_missing_local (cpi, message);
1054 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE:
1055 return handle_p2p_missing_remote (cpi, message);
869 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: 1056 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
870 return handle_p2p_element_request (cpi, (struct ElementRequest *) message); 1057 return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
871 default: 1058 default:
@@ -938,7 +1125,6 @@ listen_cb (void *cls,
938 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 1125 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
939 &incoming_stream_data_processor, incoming); 1126 &incoming_stream_data_processor, incoming);
940 1127
941
942 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); 1128 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
943 1129
944 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); 1130 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
@@ -947,6 +1133,11 @@ listen_cb (void *cls,
947} 1133}
948 1134
949 1135
1136/**
1137 * Destroy a session, free all resources associated with it.
1138 *
1139 * @param session the session to destroy
1140 */
950static void 1141static void
951destroy_session (struct ConsensusSession *session) 1142destroy_session (struct ConsensusSession *session)
952{ 1143{
@@ -1013,10 +1204,7 @@ compute_global_id (const struct GNUNET_HashCode *local_id,
1013 1204
1014 1205
1015/** 1206/**
1016 * Function called to notify a client about the connection 1207 * Transmit a queued message to the session's client.
1017 * begin ready to queue more data. "buf" will be
1018 * NULL and "size" zero if the connection was closed for
1019 * writing in the meantime.
1020 * 1208 *
1021 * @param cls consensus session 1209 * @param cls consensus session
1022 * @param size number of bytes available in buf 1210 * @param size number of bytes available in buf
@@ -1034,7 +1222,6 @@ transmit_queued (void *cls, size_t size,
1034 session = cls; 1222 session = cls;
1035 session->th = NULL; 1223 session->th = NULL;
1036 1224
1037
1038 qmsg = session->client_messages_head; 1225 qmsg = session->client_messages_head;
1039 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); 1226 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
1040 GNUNET_assert (qmsg); 1227 GNUNET_assert (qmsg);
@@ -1060,7 +1247,7 @@ transmit_queued (void *cls, size_t size,
1060 1247
1061 1248
1062/** 1249/**
1063 * Schedule sending the next message (if there is any) to a client. 1250 * Schedule transmitting the next queued message (if any) to a client.
1064 * 1251 *
1065 * @param cli the client to send the next message to 1252 * @param cli the client to send the next message to
1066 */ 1253 */
@@ -1118,7 +1305,6 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess
1118} 1305}
1119 1306
1120 1307
1121
1122/** 1308/**
1123 * Called when stream has finishes writing the hello message 1309 * Called when stream has finishes writing the hello message
1124 */ 1310 */
@@ -1128,21 +1314,25 @@ hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1128 struct ConsensusPeerInformation *cpi; 1314 struct ConsensusPeerInformation *cpi;
1129 1315
1130 cpi = cls; 1316 cpi = cls;
1317 cpi->wh = NULL;
1131 cpi->hello = GNUNET_YES; 1318 cpi->hello = GNUNET_YES;
1132 1319
1133 GNUNET_assert (GNUNET_STREAM_OK == status); 1320 GNUNET_assert (GNUNET_STREAM_OK == status);
1134 1321
1135 if (cpi->session->conclude_requested) 1322 /* FIXME: other rounds */
1323
1324 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) &&
1325 (GNUNET_YES == cpi->is_outgoing))
1136 { 1326 {
1137 write_strata (cpi, GNUNET_STREAM_OK, 0); 1327 write_strata (cpi, GNUNET_STREAM_OK, 0);
1138 } 1328 }
1139} 1329}
1140 1330
1141 1331
1142/** 1332/**
1143 * Functions of this type will be called when a stream is established 1333 * Called when we established a stream connection to another peer
1144 * 1334 *
1145 * @param cls the closure from GNUNET_STREAM_open 1335 * @param cls cpi of the peer we just connected to
1146 * @param socket socket to use to communicate with the other side (read/write) 1336 * @param socket socket to use to communicate with the other side (read/write)
1147 */ 1337 */
1148static void 1338static void
@@ -1151,9 +1341,9 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1151 struct ConsensusPeerInformation *cpi; 1341 struct ConsensusPeerInformation *cpi;
1152 struct ConsensusHello *hello; 1342 struct ConsensusHello *hello;
1153 1343
1154
1155 cpi = cls; 1344 cpi = cls;
1156 cpi->is_connected = GNUNET_YES; 1345 cpi->is_connected = GNUNET_YES;
1346 cpi->wh = NULL;
1157 1347
1158 hello = GNUNET_malloc (sizeof *hello); 1348 hello = GNUNET_malloc (sizeof *hello);
1159 hello->header.size = htons (sizeof *hello); 1349 hello->header.size = htons (sizeof *hello);
@@ -1165,7 +1355,6 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1165 1355
1166 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 1356 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1167 &session_stream_data_processor, cpi); 1357 &session_stream_data_processor, cpi);
1168
1169} 1358}
1170 1359
1171 1360
@@ -1182,7 +1371,7 @@ initialize_session_info (struct ConsensusSession *session)
1182 session->info[i].session = session; 1371 session->info[i].session = session;
1183 } 1372 }
1184 1373
1185 session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; 1374 session->current_round = CONSENSUS_ROUND_BEGIN;
1186 1375
1187 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; 1376 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1188 i = (session->local_peer_idx + 1) % session->num_peers; 1377 i = (session->local_peer_idx + 1) % session->num_peers;
@@ -1267,6 +1456,47 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke
1267 1456
1268 1457
1269/** 1458/**
1459 * Add incoming peer connections to the session,
1460 * for peers who have connected to us before the local session has been established
1461 *
1462 * @param session ...
1463 */
1464static void
1465add_incoming_peers (struct ConsensusSession *session)
1466{
1467 struct IncomingSocket *inc;
1468 inc = incoming_sockets_head;
1469
1470 while (NULL != inc)
1471 {
1472 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid))
1473 {
1474 int i;
1475 for (i = 0; i < session->num_peers; i++)
1476 {
1477 struct ConsensusPeerInformation *cpi;
1478 cpi = &session->info[i];
1479 if (0 == memcmp (inc->peer, &cpi->session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
1480 {
1481 if (GNUNET_YES == cpi->is_outgoing)
1482 {
1483 /* FIXME: disconnect */
1484 continue;
1485 }
1486 cpi->socket = inc->socket;
1487 inc->cpi = cpi;
1488 inc->cpi->mst = inc->mst;
1489 inc->cpi->hello = GNUNET_YES;
1490 break;
1491 }
1492 }
1493 }
1494 inc = inc->next;
1495 }
1496}
1497
1498
1499/**
1270 * Initialize the session, continue receiving messages from the owning client 1500 * Initialize the session, continue receiving messages from the owning client
1271 * 1501 *
1272 * @param session the session to initialize 1502 * @param session the session to initialize
@@ -1311,7 +1541,7 @@ initialize_session (struct ConsensusSession *session)
1311 for (i = 0; i < STRATA_COUNT; i++) 1541 for (i = 0; i < STRATA_COUNT; i++)
1312 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); 1542 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1313 1543
1314 session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *)); 1544 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
1315 1545
1316 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); 1546 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1317 initialize_session_info (session); 1547 initialize_session_info (session);
@@ -1319,6 +1549,8 @@ initialize_session (struct ConsensusSession *session)
1319 GNUNET_free (session->join_msg); 1549 GNUNET_free (session->join_msg);
1320 session->join_msg = NULL; 1550 session->join_msg = NULL;
1321 1551
1552 add_incoming_peers (session);
1553
1322 GNUNET_SERVER_receive_done (session->client, GNUNET_OK); 1554 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1323 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); 1555 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1324} 1556}
@@ -1370,6 +1602,19 @@ client_join (void *cls,
1370 1602
1371 1603
1372/** 1604/**
1605 * Hash a block of data, producing a replicated ibf hash.
1606 */
1607static void
1608hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret)
1609{
1610 struct IBF_Key ibf_key;
1611 GNUNET_CRYPTO_hash (block, size, ret);
1612 ibf_key = ibf_key_from_hashcode (ret);
1613 ibf_hashcode_from_key (ibf_key, ret);
1614}
1615
1616
1617/**
1373 * Called when a client performs an insert operation. 1618 * Called when a client performs an insert operation.
1374 * 1619 *
1375 * @param cls (unused) 1620 * @param cls (unused)
@@ -1384,7 +1629,7 @@ client_insert (void *cls,
1384 struct ConsensusSession *session; 1629 struct ConsensusSession *session;
1385 struct GNUNET_CONSENSUS_ElementMessage *msg; 1630 struct GNUNET_CONSENSUS_ElementMessage *msg;
1386 struct GNUNET_CONSENSUS_Element *element; 1631 struct GNUNET_CONSENSUS_Element *element;
1387 struct GNUNET_HashCode key; 1632 struct GNUNET_HashCode hash;
1388 int element_size; 1633 int element_size;
1389 1634
1390 session = sessions_head; 1635 session = sessions_head;
@@ -1413,12 +1658,14 @@ client_insert (void *cls,
1413 1658
1414 GNUNET_assert (NULL != element->data); 1659 GNUNET_assert (NULL != element->data);
1415 1660
1416 GNUNET_CRYPTO_hash (element, element_size, &key); 1661 hash_for_ibf (element, element_size, &hash);
1417 1662
1418 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, 1663 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting with hash_for_ibf %s\n", GNUNET_h2s (&hash));
1664
1665 GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element,
1419 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1666 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1420 1667
1421 strata_insert (session->strata, &key); 1668 strata_insert (session->strata, &hash);
1422 1669
1423 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1670 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1424 1671
@@ -1426,8 +1673,6 @@ client_insert (void *cls,
1426} 1673}
1427 1674
1428 1675
1429
1430
1431/** 1676/**
1432 * Functions of this signature are called whenever writing operations 1677 * Functions of this signature are called whenever writing operations
1433 * on a stream are executed 1678 * on a stream are executed
@@ -1446,10 +1691,14 @@ client_insert (void *cls,
1446static void 1691static void
1447write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1692write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1448{ 1693{
1694 struct ConsensusPeerInformation *cpi;
1695 cpi = cls;
1696 cpi->wh = NULL;
1449 GNUNET_assert (GNUNET_STREAM_OK == status); 1697 GNUNET_assert (GNUNET_STREAM_OK == status);
1450 /* just wait for the ibf */ 1698 /* just wait for the ibf */
1451} 1699}
1452 1700
1701
1453/** 1702/**
1454 * Functions of this signature are called whenever writing operations 1703 * Functions of this signature are called whenever writing operations
1455 * on a stream are executed 1704 * on a stream are executed
@@ -1470,11 +1719,9 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1470{ 1719{
1471 struct ConsensusPeerInformation *cpi; 1720 struct ConsensusPeerInformation *cpi;
1472 struct StrataMessage *strata_msg; 1721 struct StrataMessage *strata_msg;
1722 void *buf;
1473 size_t msize; 1723 size_t msize;
1474 int i; 1724 int i;
1475 uint64_t *key_dst;
1476 uint32_t *hash_dst;
1477 uint8_t *count_dst;
1478 1725
1479 cpi = cls; 1726 cpi = cls;
1480 cpi->wh = NULL; 1727 cpi->wh = NULL;
@@ -1491,36 +1738,30 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1491 strata_msg = GNUNET_malloc (msize); 1738 strata_msg = GNUNET_malloc (msize);
1492 strata_msg->header.size = htons (msize); 1739 strata_msg->header.size = htons (msize);
1493 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); 1740 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1494 1741
1495 /* for correct message alignment, copy bucket types seperately */ 1742 buf = &strata_msg[1];
1496 key_dst = (uint64_t *) &strata_msg[1];
1497
1498 for (i = 0; i < STRATA_COUNT; i++)
1499 {
1500 memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst);
1501 key_dst += STRATA_IBF_BUCKETS;
1502 }
1503
1504 hash_dst = (uint32_t *) key_dst;
1505
1506 for (i = 0; i < STRATA_COUNT; i++)
1507 {
1508 memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1509 hash_dst += STRATA_IBF_BUCKETS;
1510 }
1511
1512 count_dst = (uint8_t *) hash_dst;
1513
1514 for (i = 0; i < STRATA_COUNT; i++) 1743 for (i = 0; i < STRATA_COUNT; i++)
1515 { 1744 {
1516 memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS); 1745 ibf_write (cpi->session->strata[i], &buf, NULL);
1517 count_dst += STRATA_IBF_BUCKETS;
1518 } 1746 }
1519 1747
1520 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1748 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1521 write_strata_done, cpi); 1749 write_strata_done, cpi);
1522 1750
1523 GNUNET_assert (NULL != cpi->wh); 1751 GNUNET_assert (NULL != cpi->wh);
1752
1753 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n");
1754}
1755
1756
1757static void
1758write_ibf_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1759{
1760 struct ConsensusPeerInformation *cpi;
1761 cpi = cls;
1762 cpi->wh = NULL;
1763 GNUNET_assert (GNUNET_STREAM_OK == status);
1764 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "write ibf done callback\n");
1524} 1765}
1525 1766
1526 1767
@@ -1545,24 +1786,17 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1545 struct ConsensusPeerInformation *cpi; 1786 struct ConsensusPeerInformation *cpi;
1546 struct DifferenceDigest *digest; 1787 struct DifferenceDigest *digest;
1547 int msize; 1788 int msize;
1548 uint64_t *key_dst;
1549 uint32_t *hash_dst;
1550 uint8_t *count_dst;
1551 int num_buckets; 1789 int num_buckets;
1790 void *buf;
1552 1791
1553 cpi = cls; 1792 cpi = cls;
1554 cpi->wh = NULL; 1793 cpi->wh = NULL;
1555 1794
1556 GNUNET_assert (GNUNET_STREAM_OK == status); 1795 GNUNET_assert (GNUNET_STREAM_OK == status);
1557
1558 GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state); 1796 GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state);
1559 1797
1560 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) 1798 /* we should not be done here! */
1561 { 1799 GNUNET_assert (cpi->ibf_bucket_counter != (1 << cpi->ibf_order));
1562 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1563 /* we now wait for values / requests / another IBF because peer could not decode with our IBF */
1564 return;
1565 }
1566 1800
1567 /* remaining buckets */ 1801 /* remaining buckets */
1568 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; 1802 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
@@ -1580,24 +1814,23 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1580 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); 1814 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1581 digest->order = cpi->ibf_order; 1815 digest->order = cpi->ibf_order;
1582 1816
1583 key_dst = (uint64_t *) &digest[1]; 1817 buf = &digest[1];
1584 1818 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL);
1585 memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst);
1586 key_dst += num_buckets;
1587
1588 hash_dst = (uint32_t *) key_dst;
1589
1590 memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst);
1591 hash_dst += num_buckets;
1592
1593 count_dst = (uint8_t *) hash_dst;
1594
1595 memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst);
1596 1819
1597 cpi->ibf_bucket_counter += num_buckets; 1820 cpi->ibf_bucket_counter += num_buckets;
1598 1821
1599 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1822 /* we have to set the new state here, because of non-deterministic schedulung */
1600 write_ibf, cpi); 1823 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1824 {
1825 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1826 /* we now wait for values / requests / another IBF because peer could not decode with our IBF */
1827 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1828 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf_done, cpi);
1829 }
1830 else
1831 {
1832 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf, cpi);
1833 }
1601 1834
1602 GNUNET_assert (NULL != cpi->wh); 1835 GNUNET_assert (NULL != cpi->wh);
1603} 1836}
@@ -1622,19 +1855,32 @@ static void
1622write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1855write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1623{ 1856{
1624 struct ConsensusPeerInformation *cpi; 1857 struct ConsensusPeerInformation *cpi;
1625 uint64_t key; 1858 struct IBF_Key key;
1626 struct GNUNET_HashCode hashcode; 1859 struct GNUNET_HashCode hashcode;
1627 int side; 1860 int side;
1628 int msize;
1629 1861
1630 GNUNET_assert (GNUNET_STREAM_OK == status); 1862 GNUNET_assert (GNUNET_STREAM_OK == status);
1631 1863
1632 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); 1864 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n");
1633 1865
1634 cpi = cls; 1866 cpi = cls;
1867 GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state);
1635 cpi->wh = NULL; 1868 cpi->wh = NULL;
1636 1869
1637 GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); 1870 if (NULL != cpi->requests_and_elements_head)
1871 {
1872 struct QueuedMessage *qm;
1873 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending queued element\n");
1874 qm = cpi->requests_and_elements_head;
1875 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
1876
1877 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
1878 GNUNET_TIME_UNIT_FOREVER_REL,
1879 write_requests_and_elements, cpi);
1880 GNUNET_assert (NULL != cpi->wh);
1881 /* some elements / requests have queued up, we have to transmit them first */
1882 return;
1883 }
1638 1884
1639 for (;;) 1885 for (;;)
1640 { 1886 {
@@ -1642,10 +1888,13 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t
1642 res = ibf_decode (cpi->ibf, &side, &key); 1888 res = ibf_decode (cpi->ibf, &side, &key);
1643 if (GNUNET_SYSERR == res) 1889 if (GNUNET_SYSERR == res)
1644 { 1890 {
1891 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1892 /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1645 cpi->ibf_order++; 1893 cpi->ibf_order++;
1646 prepare_ibf (cpi); 1894 prepare_ibf (cpi);
1647 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); 1895 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1648 cpi->ibf_state = IBF_STATE_TRANSMITTING; 1896 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1897 cpi->ibf_bucket_counter = 0;
1649 write_ibf (cls, status, size); 1898 write_ibf (cls, status, size);
1650 return; 1899 return;
1651 } 1900 }
@@ -1656,38 +1905,52 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t
1656 } 1905 }
1657 if (-1 == side) 1906 if (-1 == side)
1658 { 1907 {
1659 struct GNUNET_CONSENSUS_Element *element; 1908 /* we have the element, send it to the other peer */
1660 struct GNUNET_MessageHeader *element_msg; 1909 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element\n");
1661 ibf_hashcode_from_key (key, &hashcode); 1910 ibf_hashcode_from_key (key, &hashcode);
1662 /* FIXME: this only transmits one element stored with the key */ 1911 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi);
1663 element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); 1912 /* send the first message, because we can! */
1664 if (NULL == element) 1913 if (NULL != cpi->requests_and_elements_head)
1665 continue; 1914 {
1666 msize = sizeof (struct GNUNET_MessageHeader) + element->size; 1915 struct QueuedMessage *qm;
1667 element_msg = GNUNET_malloc (msize); 1916 qm = cpi->requests_and_elements_head;
1668 element_msg->size = htons (msize); 1917 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
1669 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); 1918 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing element\n");
1670 GNUNET_assert (NULL != element->data); 1919 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
1671 memcpy (&element_msg[1], element->data, element->size); 1920 GNUNET_TIME_UNIT_FOREVER_REL,
1672 cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1921 write_requests_and_elements, cpi);
1673 write_requests_and_elements, cpi); 1922 GNUNET_assert (NULL != cpi->wh);
1674 GNUNET_free (element_msg); 1923 }
1675 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); 1924 else
1676 1925 {
1677 GNUNET_assert (NULL != cpi->wh); 1926 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "no element found for decoded hash %s\n", GNUNET_h2s (&hashcode));
1927 }
1678 return; 1928 return;
1679 } 1929 }
1680 else 1930 else
1681 { 1931 {
1682 struct ElementRequest *msg; 1932 struct ElementRequest *msg;
1683 size_t msize; 1933 size_t msize;
1684 uint64_t *p; 1934 struct IBF_Key *p;
1685 1935
1686 msize = (sizeof *msg) + sizeof (uint64_t); 1936 msize = (sizeof *msg) + sizeof (uint64_t);
1687 msg = GNUNET_malloc (msize); 1937 msg = GNUNET_malloc (msize);
1688 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); 1938 if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round)
1939 {
1940 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request for element\n");
1941 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1942 }
1943 else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)
1944 {
1945 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending locally missing element\n");
1946 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL);
1947 }
1948 else
1949 {
1950 GNUNET_assert (0);
1951 }
1689 msg->header.size = htons (msize); 1952 msg->header.size = htons (msize);
1690 p = (uint64_t *) &msg[1]; 1953 p = (struct IBF_Key *) &msg[1];
1691 *p = key; 1954 *p = key;
1692 1955
1693 cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1956 cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1697,17 +1960,151 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t
1697 return; 1960 return;
1698 } 1961 }
1699 } 1962 }
1700
1701} 1963}
1702 1964
1703 1965
1966static double
1967compute_similarity (struct ConsensusSession *session, int p1, int p2)
1968{
1969 /* FIXME: simplistic dummy implementation, use real set union/intersecion */
1970 return (session->info[p1].num_missing_local + session->info[p2].num_missing_local) /
1971 ((double) (session->info[p1].num_missing_remote + session->info[p2].num_missing_remote + 1));
1972}
1973
1704 1974
1705/*
1706static void 1975static void
1707select_best_group (struct ConsensusSession *session) 1976select_fittest_group (struct ConsensusSession *session)
1708{ 1977{
1978 /* simplistic implementation: compute the similarity with the latest strata estimator,
1979 * rank the results once */
1980 struct GNUNET_PeerIdentity *group;
1981 double rating[session->num_peers];
1982 struct GNUNET_CONSENSUS_ConcludeDoneMessage *done_msg;
1983 size_t msize;
1984 int i;
1985 int j;
1986 /* number of peers in the consensus group */
1987 int k;
1988
1989 k = ceil(session->num_peers / 3.0) * 2;
1990 group = GNUNET_malloc (k * sizeof *group);
1991
1992 /* do strata subtraction */
1993 /* FIXME: we know the real sets, subtract them! */
1994 for (i = 0; i < session->num_peers; i++)
1995 {
1996 rating[i] = 0;
1997 for (j = 0; j < i; j++)
1998 {
1999 double sim;
2000 sim = compute_similarity (session, i, j);
2001 rating[i] += sim;
2002 rating[j] += sim;
2003 }
2004 }
2005 for (i = 0; i < k; i++)
2006 {
2007 int best_idx = 0;
2008 for (j = 1; j < session->num_peers; j++)
2009 if (rating[j] > rating[best_idx])
2010 best_idx = j;
2011 rating[best_idx] = -1;
2012 group[i] = session->peers[best_idx];
2013 }
2014
2015 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got group!\n");
2016
2017 msize = sizeof *done_msg + k * sizeof *group;
2018
2019 done_msg = GNUNET_malloc (msize);
2020 done_msg->header.size = htons (msize);
2021 done_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
2022 memcpy (&done_msg[1], group, k * sizeof *group);
2023
2024 queue_client_message (session, (struct GNUNET_MessageHeader *) done_msg);
2025 send_next (session);
2026}
2027
2028
2029/**
2030 * Select and kick off the next round, based on the current round.
2031 * @param cls the session
2032 * @param tc task context, for when this task is invoked by the scheduler,
2033 * NULL if invoked for another reason
2034 */
2035static void
2036round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2037{
2038 struct ConsensusSession *session;
2039 int i;
2040
2041 /* don't kick off next round if we're shutting down */
2042 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2043 return;
2044
2045 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n");
2046 session = cls;
2047
2048 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2049 {
2050 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2051 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2052 }
2053
2054 for (i = 0; i < session->num_peers; i++)
2055 {
2056 if ((NULL != session->info) && (NULL != session->info[i].wh))
2057 GNUNET_STREAM_write_cancel (session->info[i].wh);
2058 }
2059
2060 switch (session->current_round)
2061 {
2062 case CONSENSUS_ROUND_BEGIN:
2063 {
2064 session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE;
2065 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4),
2066 round_over, session);
2067 for (i = 0; i < session->num_peers; i++)
2068 {
2069 /* we can only talk to hello'ed peers */
2070 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
2071 (GNUNET_YES == session->info[i].hello) )
2072 {
2073 /* kick off transmitting strata by calling the write continuation */
2074 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
2075 }
2076 }
2077 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude started, timeout=%llu\n", session->conclude_timeout.rel_value);
2078 break;
2079 }
2080 case CONSENSUS_ROUND_A2A_EXCHANGE:
2081 {
2082 session->current_round = CONSENSUS_ROUND_A2A_INVENTORY;
2083 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "starting inventory round\n");
2084 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4),
2085 round_over, session);
2086 for (i = 0; i < session->num_peers; i++)
2087 {
2088 session->info[i].ibf_state = IBF_STATE_NONE;
2089 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
2090 (GNUNET_YES == session->info[i].hello) )
2091 {
2092 /* kick off transmitting strata by calling the write continuation */
2093 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
2094 }
2095 }
2096 break;
2097 }
2098 case CONSENSUS_ROUND_A2A_INVENTORY:
2099 /* finally, we are done and select the most fitting group */
2100 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "protocol rounds done\n");
2101 session->current_round = CONSENSUS_ROUND_FINISH;
2102 select_fittest_group (session);
2103 break;
2104 default:
2105 GNUNET_assert (0);
2106 }
1709} 2107}
1710*/
1711 2108
1712 2109
1713/** 2110/**
@@ -1719,13 +2116,13 @@ select_best_group (struct ConsensusSession *session)
1719 */ 2116 */
1720static void 2117static void
1721client_conclude (void *cls, 2118client_conclude (void *cls,
1722 struct GNUNET_SERVER_Client *client, 2119 struct GNUNET_SERVER_Client *client,
1723 const struct GNUNET_MessageHeader *message) 2120 const struct GNUNET_MessageHeader *message)
1724{ 2121{
1725 struct ConsensusSession *session; 2122 struct ConsensusSession *session;
1726 int i; 2123 struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
1727 2124
1728 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); 2125 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
1729 2126
1730 session = sessions_head; 2127 session = sessions_head;
1731 while ((session != NULL) && (session->client != client)) 2128 while ((session != NULL) && (session->client != client))
@@ -1738,25 +2135,19 @@ client_conclude (void *cls,
1738 return; 2135 return;
1739 } 2136 }
1740 2137
1741 if (GNUNET_YES == session->conclude_requested) 2138 if (CONSENSUS_ROUND_BEGIN != session->current_round)
1742 { 2139 {
1743 /* client requested conclude twice */ 2140 /* client requested conclude twice */
2141 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "unexpected round at conclude: %d\n", session->current_round);
1744 GNUNET_break (0); 2142 GNUNET_break (0);
1745 disconnect_client (client); 2143 disconnect_client (client);
1746 return; 2144 return;
1747 } 2145 }
1748 2146
1749 session->conclude_requested = GNUNET_YES; 2147 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
1750 2148
1751 for (i = 0; i < session->num_peers; i++) 2149 /* the 'begin' round is over, start with the next, real round */
1752 { 2150 round_over (session, NULL);
1753 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
1754 (GNUNET_YES == session->info[i].hello) )
1755 {
1756 /* kick off transmitting strata by calling the write continuation */
1757 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1758 }
1759 }
1760 2151
1761 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2152 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1762 send_next (session); 2153 send_next (session);
@@ -1803,12 +2194,19 @@ client_ack (void *cls,
1803 2194
1804 if (msg->keep) 2195 if (msg->keep)
1805 { 2196 {
2197 int i;
1806 element = pending->element; 2198 element = pending->element;
1807 GNUNET_CRYPTO_hash (element, element->size, &key); 2199 hash_for_ibf (element, element->size, &key);
1808 2200
1809 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, 2201 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1810 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 2202 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1811 strata_insert (session->strata, &key); 2203 strata_insert (session->strata, &key);
2204
2205 for (i = 0; i <= MAX_IBF_ORDER; i++)
2206 {
2207 if (NULL != session->ibfs[i])
2208 ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&key));
2209 }
1812 } 2210 }
1813 2211
1814 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2212 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1862,7 +2260,6 @@ static void
1862shutdown_task (void *cls, 2260shutdown_task (void *cls,
1863 const struct GNUNET_SCHEDULER_TaskContext *tc) 2261 const struct GNUNET_SCHEDULER_TaskContext *tc)
1864{ 2262{
1865
1866 /* FIXME: complete; write separate destructors for different data types */ 2263 /* FIXME: complete; write separate destructors for different data types */
1867 2264
1868 while (NULL != incoming_sockets_head) 2265 while (NULL != incoming_sockets_head)
@@ -1884,15 +2281,16 @@ shutdown_task (void *cls,
1884 2281
1885 session = sessions_head; 2282 session = sessions_head;
1886 2283
1887 for (i = 0; session->num_peers; i++) 2284 if (NULL != session->info)
1888 { 2285 for (i = 0; i < session->num_peers; i++)
1889 struct ConsensusPeerInformation *cpi;
1890 cpi = &session->info[i];
1891 if ((NULL != cpi) && (NULL != cpi->socket))
1892 { 2286 {
1893 GNUNET_STREAM_close (cpi->socket); 2287 struct ConsensusPeerInformation *cpi;
2288 cpi = &session->info[i];
2289 if ((NULL != cpi) && (NULL != cpi->socket))
2290 {
2291 GNUNET_STREAM_close (cpi->socket);
2292 }
1894 } 2293 }
1895 }
1896 2294
1897 if (NULL != session->client) 2295 if (NULL != session->client)
1898 GNUNET_SERVER_client_disconnect (session->client); 2296 GNUNET_SERVER_client_disconnect (session->client);
@@ -1952,7 +2350,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
1952 listen_cb, NULL, 2350 listen_cb, NULL,
1953 GNUNET_STREAM_OPTION_END); 2351 GNUNET_STREAM_OPTION_END);
1954 2352
1955
1956 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ 2353 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
1957 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers); 2354 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
1958 GNUNET_assert (NULL != core); 2355 GNUNET_assert (NULL != core);