aboutsummaryrefslogtreecommitdiff
path: root/src/setu/gnunet-service-setu.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2020-08-18 15:05:30 +0200
committerChristian Grothoff <christian@grothoff.org>2020-08-18 15:05:30 +0200
commitee6d0296795bb4cea9c0e157d6295a98d9e6d364 (patch)
treebc28e2306aafef34e865b68ee21151d0be74cadb /src/setu/gnunet-service-setu.c
parent48fdab91834c4649048d371d6b454e9b03b9fa50 (diff)
downloadgnunet-ee6d0296795bb4cea9c0e157d6295a98d9e6d364.tar.gz
gnunet-ee6d0296795bb4cea9c0e157d6295a98d9e6d364.zip
-refactor to eliminate code no longer needed after set->setu specialization
Diffstat (limited to 'src/setu/gnunet-service-setu.c')
-rw-r--r--src/setu/gnunet-service-setu.c943
1 files changed, 587 insertions, 356 deletions
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
index 166522afa..c59d375cf 100644
--- a/src/setu/gnunet-service-setu.c
+++ b/src/setu/gnunet-service-setu.c
@@ -26,15 +26,18 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h" 28#include "gnunet_statistics_service.h"
29#include "gnunet-service-setu.h"
30#include "ibf.h" 29#include "ibf.h"
30#include "gnunet_protocols.h"
31#include "gnunet_applications.h"
32#include "gnunet_cadet_service.h"
31#include "gnunet-service-setu_strata_estimator.h" 33#include "gnunet-service-setu_strata_estimator.h"
32#include "gnunet-service-setu_protocol.h" 34#include "gnunet-service-setu_protocol.h"
33#include "gnunet_statistics_service.h" 35#include "gnunet_statistics_service.h"
34#include <gcrypt.h> 36#include <gcrypt.h>
37#include "gnunet_setu_service.h"
38#include "setu.h"
35 39
36 40#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
37#define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
38 41
39/** 42/**
40 * How long do we hold on to an incoming channel if there is 43 * How long do we hold on to an incoming channel if there is
@@ -142,11 +145,120 @@ enum UnionOperationPhase
142 145
143 146
144/** 147/**
145 * State of an evaluate operation with another peer. 148 * Information about an element element in the set. All elements are
149 * stored in a hash-table from their hash-code to their `struct
150 * Element`, so that the remove and add operations are reasonably
151 * fast.
152 */
153struct ElementEntry
154{
155 /**
156 * The actual element. The data for the element
157 * should be allocated at the end of this struct.
158 */
159 struct GNUNET_SETU_Element element;
160
161 /**
162 * Hash of the element. For set union: Will be used to derive the
163 * different IBF keys for different salts.
164 */
165 struct GNUNET_HashCode element_hash;
166
167 /**
168 * First generation that includes this element.
169 */
170 unsigned int generation;
171
172 /**
173 * #GNUNET_YES if the element is a remote element, and does not belong
174 * to the operation's set.
175 */
176 int remote;
177};
178
179
180/**
181 * A listener is inhabited by a client, and waits for evaluation
182 * requests from remote peers.
183 */
184struct Listener;
185
186
187/**
188 * A set that supports a specific operation with other peers.
189 */
190struct Set;
191
192
193/**
194 * State we keep per client.
195 */
196struct ClientState
197{
198 /**
199 * Set, if associated with the client, otherwise NULL.
200 */
201 struct Set *set;
202
203 /**
204 * Listener, if associated with the client, otherwise NULL.
205 */
206 struct Listener *listener;
207
208 /**
209 * Client handle.
210 */
211 struct GNUNET_SERVICE_Client *client;
212
213 /**
214 * Message queue.
215 */
216 struct GNUNET_MQ_Handle *mq;
217};
218
219
220/**
221 * Operation context used to execute a set operation.
146 */ 222 */
147struct OperationState 223struct Operation
148{ 224{
149 /** 225 /**
226 * Kept in a DLL of the listener, if @e listener is non-NULL.
227 */
228 struct Operation *next;
229
230 /**
231 * Kept in a DLL of the listener, if @e listener is non-NULL.
232 */
233 struct Operation *prev;
234
235 /**
236 * Channel to the peer.
237 */
238 struct GNUNET_CADET_Channel *channel;
239
240 /**
241 * Port this operation runs on.
242 */
243 struct Listener *listener;
244
245 /**
246 * Message queue for the channel.
247 */
248 struct GNUNET_MQ_Handle *mq;
249
250 /**
251 * Context message, may be NULL.
252 */
253 struct GNUNET_MessageHeader *context_msg;
254
255 /**
256 * Set associated with the operation, NULL until the spec has been
257 * associated with a set.
258 */
259 struct Set *set;
260
261 /**
150 * Copy of the set's strata estimator at the time of 262 * Copy of the set's strata estimator at the time of
151 * creation of this operation. 263 * creation of this operation.
152 */ 264 */
@@ -215,6 +327,152 @@ struct OperationState
215 * the operation started. 327 * the operation started.
216 */ 328 */
217 uint64_t initial_size; 329 uint64_t initial_size;
330
331 /**
332 * The identity of the requesting peer. Needs to
333 * be stored here as the op spec might not have been created yet.
334 */
335 struct GNUNET_PeerIdentity peer;
336
337 /**
338 * Timeout task, if the incoming peer has not been accepted
339 * after the timeout, it will be disconnected.
340 */
341 struct GNUNET_SCHEDULER_Task *timeout_task;
342
343 /**
344 * Salt to use for the operation.
345 */
346 uint32_t salt;
347
348 /**
349 * Remote peers element count
350 */
351 uint32_t remote_element_count;
352
353 /**
354 * ID used to identify an operation between service and client
355 */
356 uint32_t client_request_id;
357
358 /**
359 * Always use delta operation instead of sending full sets,
360 * even it it's less efficient.
361 */
362 int force_delta;
363
364 /**
365 * Always send full sets, even if delta operations would
366 * be more efficient.
367 */
368 int force_full;
369
370 /**
371 * #GNUNET_YES to fail operations where Byzantine faults
372 * are suspected
373 */
374 int byzantine;
375
376 /**
377 * Lower bound for the set size, used only when
378 * byzantine mode is enabled.
379 */
380 int byzantine_lower_bound;
381
382 /**
383 * Unique request id for the request from a remote peer, sent to the
384 * client, which will accept or reject the request. Set to '0' iff
385 * the request has not been suggested yet.
386 */
387 uint32_t suggest_id;
388
389 /**
390 * Generation in which the operation handle
391 * was created.
392 */
393 unsigned int generation_created;
394};
395
396
397/**
398 * SetContent stores the actual set elements, which may be shared by
399 * multiple generations derived from one set.
400 */
401struct SetContent
402{
403 /**
404 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
405 */
406 struct GNUNET_CONTAINER_MultiHashMap *elements;
407
408 /**
409 * Number of references to the content.
410 */
411 unsigned int refcount;
412
413 /**
414 * FIXME: document!
415 */
416 unsigned int latest_generation;
417
418 /**
419 * Number of concurrently active iterators.
420 */
421 int iterator_count;
422};
423
424
425/**
426 * A set that supports a specific operation with other peers.
427 */
428struct Set
429{
430 /**
431 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
432 */
433 struct Set *next;
434
435 /**
436 * Sets are held in a doubly linked list.
437 */
438 struct Set *prev;
439
440 /**
441 * Client that owns the set. Only one client may own a set,
442 * and there can only be one set per client.
443 */
444 struct ClientState *cs;
445
446 /**
447 * Content, possibly shared by multiple sets,
448 * and thus reference counted.
449 */
450 struct SetContent *content;
451
452 /**
453 * The strata estimator is only generated once for
454 * each set.
455 * The IBF keys are derived from the element hashes with
456 * salt=0.
457 */
458 struct StrataEstimator *se;
459
460 /**
461 * Evaluate operations are held in a linked list.
462 */
463 struct Operation *ops_head;
464
465 /**
466 * Evaluate operations are held in a linked list.
467 */
468 struct Operation *ops_tail;
469
470 /**
471 * Current generation, that is, number of previously executed
472 * operations and lazy copies on the underlying set content.
473 */
474 unsigned int current_generation;
475
218}; 476};
219 477
220 478
@@ -267,21 +525,6 @@ struct SendElementClosure
267 525
268 526
269/** 527/**
270 * Extra state required for efficient set union.
271 */
272struct SetState
273{
274 /**
275 * The strata estimator is only generated once for
276 * each set.
277 * The IBF keys are derived from the element hashes with
278 * salt=0.
279 */
280 struct StrataEstimator *se;
281};
282
283
284/**
285 * A listener is inhabited by a client, and waits for evaluation 528 * A listener is inhabited by a client, and waits for evaluation
286 * requests from remote peers. 529 * requests from remote peers.
287 */ 530 */
@@ -340,7 +583,7 @@ static struct GNUNET_CADET_Handle *cadet;
340/** 583/**
341 * Statistics handle. 584 * Statistics handle.
342 */ 585 */
343struct GNUNET_STATISTICS_Handle *_GSS_statistics; 586static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
344 587
345/** 588/**
346 * Listeners are held in a doubly linked list. 589 * Listeners are held in a doubly linked list.
@@ -412,43 +655,209 @@ union_op_cancel (struct Operation *op)
412 LOG (GNUNET_ERROR_TYPE_DEBUG, 655 LOG (GNUNET_ERROR_TYPE_DEBUG,
413 "destroying union op\n"); 656 "destroying union op\n");
414 /* check if the op was canceled twice */ 657 /* check if the op was canceled twice */
415 GNUNET_assert (NULL != op->state); 658 if (NULL != op->remote_ibf)
416 if (NULL != op->state->remote_ibf)
417 { 659 {
418 ibf_destroy (op->state->remote_ibf); 660 ibf_destroy (op->remote_ibf);
419 op->state->remote_ibf = NULL; 661 op->remote_ibf = NULL;
420 } 662 }
421 if (NULL != op->state->demanded_hashes) 663 if (NULL != op->demanded_hashes)
422 { 664 {
423 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); 665 GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
424 op->state->demanded_hashes = NULL; 666 op->demanded_hashes = NULL;
425 } 667 }
426 if (NULL != op->state->local_ibf) 668 if (NULL != op->local_ibf)
427 { 669 {
428 ibf_destroy (op->state->local_ibf); 670 ibf_destroy (op->local_ibf);
429 op->state->local_ibf = NULL; 671 op->local_ibf = NULL;
430 } 672 }
431 if (NULL != op->state->se) 673 if (NULL != op->se)
432 { 674 {
433 strata_estimator_destroy (op->state->se); 675 strata_estimator_destroy (op->se);
434 op->state->se = NULL; 676 op->se = NULL;
435 } 677 }
436 if (NULL != op->state->key_to_element) 678 if (NULL != op->key_to_element)
437 { 679 {
438 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 680 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
439 &destroy_key_to_element_iter, 681 &destroy_key_to_element_iter,
440 NULL); 682 NULL);
441 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); 683 GNUNET_CONTAINER_multihashmap32_destroy (op->key_to_element);
442 op->state->key_to_element = NULL; 684 op->key_to_element = NULL;
443 } 685 }
444 GNUNET_free (op->state);
445 op->state = NULL;
446 LOG (GNUNET_ERROR_TYPE_DEBUG, 686 LOG (GNUNET_ERROR_TYPE_DEBUG,
447 "destroying union op done\n"); 687 "destroying union op done\n");
448} 688}
449 689
450 690
451/** 691/**
692 * Signal to the client that the operation has finished and
693 * destroy the operation.
694 *
695 * @param cls operation to destroy
696 */
697static void
698send_client_done (void *cls)
699{
700 struct Operation *op = cls;
701 struct GNUNET_MQ_Envelope *ev;
702 struct GNUNET_SETU_ResultMessage *rm;
703
704 if (GNUNET_YES == op->client_done_sent)
705 return;
706 if (PHASE_DONE != op->phase)
707 {
708 LOG (GNUNET_ERROR_TYPE_WARNING,
709 "Union operation failed\n");
710 GNUNET_STATISTICS_update (_GSS_statistics,
711 "# Union operations failed",
712 1,
713 GNUNET_NO);
714 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SETU_RESULT);
715 rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
716 rm->request_id = htonl (op->client_request_id);
717 rm->element_type = htons (0);
718 GNUNET_MQ_send (op->set->cs->mq,
719 ev);
720 return;
721 }
722
723 op->client_done_sent = GNUNET_YES;
724
725 GNUNET_STATISTICS_update (_GSS_statistics,
726 "# Union operations succeeded",
727 1,
728 GNUNET_NO);
729 LOG (GNUNET_ERROR_TYPE_INFO,
730 "Signalling client that union operation is done\n");
731 ev = GNUNET_MQ_msg (rm,
732 GNUNET_MESSAGE_TYPE_SETU_RESULT);
733 rm->request_id = htonl (op->client_request_id);
734 rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
735 rm->element_type = htons (0);
736 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
737 op->key_to_element));
738 GNUNET_MQ_send (op->set->cs->mq,
739 ev);
740}
741
742
743/* FIXME: the destroy logic is a mess and should be cleaned up! */
744
745/**
746 * Destroy the given operation. Used for any operation where both
747 * peers were known and that thus actually had a vt and channel. Must
748 * not be used for operations where 'listener' is still set and we do
749 * not know the other peer.
750 *
751 * Call the implementation-specific cancel function of the operation.
752 * Disconnects from the remote peer. Does not disconnect the client,
753 * as there may be multiple operations per set.
754 *
755 * @param op operation to destroy
756 */
757static void
758_GSS_operation_destroy (struct Operation *op)
759{
760 struct Set *set = op->set;
761 struct GNUNET_CADET_Channel *channel;
762
763 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764 "Destroying operation %p\n",
765 op);
766 GNUNET_assert (NULL == op->listener);
767 // FIXME: inline?
768 union_op_cancel (op);
769 if (NULL != set)
770 {
771 GNUNET_CONTAINER_DLL_remove (set->ops_head,
772 set->ops_tail,
773 op);
774 op->set = NULL;
775 }
776 if (NULL != op->context_msg)
777 {
778 GNUNET_free (op->context_msg);
779 op->context_msg = NULL;
780 }
781 if (NULL != (channel = op->channel))
782 {
783 /* This will free op; called conditionally as this helper function
784 is also called from within the channel disconnect handler. */
785 op->channel = NULL;
786 GNUNET_CADET_channel_destroy (channel);
787 }
788 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
789 * there was a channel end handler that will free 'op' on the call stack. */
790}
791
792
793/**
794 * This function probably should not exist
795 * and be replaced by inlining more specific
796 * logic in the various places where it is called.
797 */
798static void
799_GSS_operation_destroy2 (struct Operation *op);
800
801
802/**
803 * Destroy an incoming request from a remote peer
804 *
805 * @param op remote request to destroy
806 */
807static void
808incoming_destroy (struct Operation *op)
809{
810 struct Listener *listener;
811
812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813 "Destroying incoming operation %p\n",
814 op);
815 if (NULL != (listener = op->listener))
816 {
817 GNUNET_CONTAINER_DLL_remove (listener->op_head,
818 listener->op_tail,
819 op);
820 op->listener = NULL;
821 }
822 if (NULL != op->timeout_task)
823 {
824 GNUNET_SCHEDULER_cancel (op->timeout_task);
825 op->timeout_task = NULL;
826 }
827 _GSS_operation_destroy2 (op);
828}
829
830
831/**
832 * This function probably should not exist
833 * and be replaced by inlining more specific
834 * logic in the various places where it is called.
835 */
836static void
837_GSS_operation_destroy2 (struct Operation *op)
838{
839 struct GNUNET_CADET_Channel *channel;
840
841 if (NULL != (channel = op->channel))
842 {
843 /* This will free op; called conditionally as this helper function
844 is also called from within the channel disconnect handler. */
845 op->channel = NULL;
846 GNUNET_CADET_channel_destroy (channel);
847 }
848 if (NULL != op->listener)
849 {
850 incoming_destroy (op);
851 return;
852 }
853 if (NULL != op->set)
854 send_client_done (op);
855 _GSS_operation_destroy (op);
856 GNUNET_free (op);
857}
858
859
860/**
452 * Inform the client that the union operation has failed, 861 * Inform the client that the union operation has failed,
453 * and proceed to destroy the evaluate operation. 862 * and proceed to destroy the evaluate operation.
454 * 863 *
@@ -559,7 +968,7 @@ op_get_element (struct Operation *op,
559 ctx.hash = *element_hash; 968 ctx.hash = *element_hash;
560 969
561 ibf_key = get_ibf_key (element_hash); 970 ibf_key = get_ibf_key (element_hash);
562 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, 971 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->key_to_element,
563 (uint32_t) ibf_key.key_val, 972 (uint32_t) ibf_key.key_val,
564 &op_get_element_iterator, 973 &op_get_element_iterator,
565 &ctx); 974 &ctx);
@@ -602,7 +1011,7 @@ op_register_element (struct Operation *op,
602 k->ibf_key = ibf_key; 1011 k->ibf_key = ibf_key;
603 k->received = received; 1012 k->received = received;
604 GNUNET_assert (GNUNET_OK == 1013 GNUNET_assert (GNUNET_OK ==
605 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, 1014 GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
606 (uint32_t) ibf_key.key_val, 1015 (uint32_t) ibf_key.key_val,
607 k, 1016 k,
608 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 1017 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
@@ -664,14 +1073,29 @@ prepare_ibf_iterator (void *cls,
664 (unsigned long) ke->ibf_key.key_val, 1073 (unsigned long) ke->ibf_key.key_val,
665 GNUNET_h2s (&ke->element->element_hash)); 1074 GNUNET_h2s (&ke->element->element_hash));
666 salt_key (&ke->ibf_key, 1075 salt_key (&ke->ibf_key,
667 op->state->salt_send, 1076 op->salt_send,
668 &salted_key); 1077 &salted_key);
669 ibf_insert (op->state->local_ibf, salted_key); 1078 ibf_insert (op->local_ibf, salted_key);
670 return GNUNET_YES; 1079 return GNUNET_YES;
671} 1080}
672 1081
673 1082
674/** 1083/**
1084 * Is element @a ee part of the set used by @a op?
1085 *
1086 * @param ee element to test
1087 * @param op operation the defines the set and its generation
1088 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
1089 */
1090static int
1091_GSS_is_element_of_operation (struct ElementEntry *ee,
1092 struct Operation *op)
1093{
1094 return ee->generation >= op->generation_created;
1095}
1096
1097
1098/**
675 * Iterator for initializing the 1099 * Iterator for initializing the
676 * key-to-element mapping of a union operation 1100 * key-to-element mapping of a union operation
677 * 1101 *
@@ -714,9 +1138,9 @@ initialize_key_to_element (struct Operation *op)
714{ 1138{
715 unsigned int len; 1139 unsigned int len;
716 1140
717 GNUNET_assert (NULL == op->state->key_to_element); 1141 GNUNET_assert (NULL == op->key_to_element);
718 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements); 1142 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
719 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 1143 op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
720 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, 1144 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
721 &init_key_to_element_iterator, 1145 &init_key_to_element_iterator,
722 op); 1146 op);
@@ -735,18 +1159,18 @@ static int
735prepare_ibf (struct Operation *op, 1159prepare_ibf (struct Operation *op,
736 uint32_t size) 1160 uint32_t size)
737{ 1161{
738 GNUNET_assert (NULL != op->state->key_to_element); 1162 GNUNET_assert (NULL != op->key_to_element);
739 1163
740 if (NULL != op->state->local_ibf) 1164 if (NULL != op->local_ibf)
741 ibf_destroy (op->state->local_ibf); 1165 ibf_destroy (op->local_ibf);
742 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 1166 op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
743 if (NULL == op->state->local_ibf) 1167 if (NULL == op->local_ibf)
744 { 1168 {
745 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1169 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
746 "Failed to allocate local IBF\n"); 1170 "Failed to allocate local IBF\n");
747 return GNUNET_SYSERR; 1171 return GNUNET_SYSERR;
748 } 1172 }
749 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 1173 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
750 &prepare_ibf_iterator, 1174 &prepare_ibf_iterator,
751 op); 1175 op);
752 return GNUNET_OK; 1176 return GNUNET_OK;
@@ -786,7 +1210,7 @@ send_ibf (struct Operation *op,
786 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); 1210 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
787 } 1211 }
788 1212
789 ibf = op->state->local_ibf; 1213 ibf = op->local_ibf;
790 1214
791 while (buckets_sent < (1 << ibf_order)) 1215 while (buckets_sent < (1 << ibf_order))
792 { 1216 {
@@ -806,7 +1230,7 @@ send_ibf (struct Operation *op,
806 msg->reserved2 = 0; 1230 msg->reserved2 = 0;
807 msg->order = ibf_order; 1231 msg->order = ibf_order;
808 msg->offset = htonl (buckets_sent); 1232 msg->offset = htonl (buckets_sent);
809 msg->salt = htonl (op->state->salt_send); 1233 msg->salt = htonl (op->salt_send);
810 ibf_write_slice (ibf, buckets_sent, 1234 ibf_write_slice (ibf, buckets_sent,
811 buckets_in_message, &msg[1]); 1235 buckets_in_message, &msg[1]);
812 buckets_sent += buckets_in_message; 1236 buckets_sent += buckets_in_message;
@@ -820,7 +1244,7 @@ send_ibf (struct Operation *op,
820 1244
821 /* The other peer must decode the IBF, so 1245 /* The other peer must decode the IBF, so
822 * we're passive. */ 1246 * we're passive. */
823 op->state->phase = PHASE_INVENTORY_PASSIVE; 1247 op->phase = PHASE_INVENTORY_PASSIVE;
824 return GNUNET_OK; 1248 return GNUNET_OK;
825} 1249}
826 1250
@@ -893,7 +1317,7 @@ send_full_set (struct Operation *op)
893{ 1317{
894 struct GNUNET_MQ_Envelope *ev; 1318 struct GNUNET_MQ_Envelope *ev;
895 1319
896 op->state->phase = PHASE_FULL_SENDING; 1320 op->phase = PHASE_FULL_SENDING;
897 LOG (GNUNET_ERROR_TYPE_DEBUG, 1321 LOG (GNUNET_ERROR_TYPE_DEBUG,
898 "Dedicing to transmit the full set\n"); 1322 "Dedicing to transmit the full set\n");
899 /* FIXME: use a more memory-friendly way of doing this with an 1323 /* FIXME: use a more memory-friendly way of doing this with an
@@ -921,7 +1345,7 @@ check_union_p2p_strata_estimator (void *cls,
921 int is_compressed; 1345 int is_compressed;
922 size_t len; 1346 size_t len;
923 1347
924 if (op->state->phase != PHASE_EXPECT_SE) 1348 if (op->phase != PHASE_EXPECT_SE)
925 { 1349 {
926 GNUNET_break (0); 1350 GNUNET_break (0);
927 return GNUNET_SYSERR; 1351 return GNUNET_SYSERR;
@@ -984,16 +1408,16 @@ handle_union_p2p_strata_estimator (void *cls,
984 fail_union_operation (op); 1408 fail_union_operation (op);
985 return; 1409 return;
986 } 1410 }
987 GNUNET_assert (NULL != op->state->se); 1411 GNUNET_assert (NULL != op->se);
988 diff = strata_estimator_difference (remote_se, 1412 diff = strata_estimator_difference (remote_se,
989 op->state->se); 1413 op->se);
990 1414
991 if (diff > 200) 1415 if (diff > 200)
992 diff = diff * 3 / 2; 1416 diff = diff * 3 / 2;
993 1417
994 strata_estimator_destroy (remote_se); 1418 strata_estimator_destroy (remote_se);
995 strata_estimator_destroy (op->state->se); 1419 strata_estimator_destroy (op->se);
996 op->state->se = NULL; 1420 op->se = NULL;
997 LOG (GNUNET_ERROR_TYPE_DEBUG, 1421 LOG (GNUNET_ERROR_TYPE_DEBUG,
998 "got se diff=%d, using ibf size %d\n", 1422 "got se diff=%d, using ibf size %d\n",
999 diff, 1423 diff,
@@ -1021,18 +1445,18 @@ handle_union_p2p_strata_estimator (void *cls,
1021 } 1445 }
1022 1446
1023 if ((GNUNET_YES == op->force_full) || 1447 if ((GNUNET_YES == op->force_full) ||
1024 (diff > op->state->initial_size / 4) || 1448 (diff > op->initial_size / 4) ||
1025 (0 == other_size)) 1449 (0 == other_size))
1026 { 1450 {
1027 LOG (GNUNET_ERROR_TYPE_DEBUG, 1451 LOG (GNUNET_ERROR_TYPE_DEBUG,
1028 "Deciding to go for full set transmission (diff=%d, own set=%u)\n", 1452 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
1029 diff, 1453 diff,
1030 op->state->initial_size); 1454 op->initial_size);
1031 GNUNET_STATISTICS_update (_GSS_statistics, 1455 GNUNET_STATISTICS_update (_GSS_statistics,
1032 "# of full sends", 1456 "# of full sends",
1033 1, 1457 1,
1034 GNUNET_NO); 1458 GNUNET_NO);
1035 if ((op->state->initial_size <= other_size) || 1459 if ((op->initial_size <= other_size) ||
1036 (0 == other_size)) 1460 (0 == other_size))
1037 { 1461 {
1038 send_full_set (op); 1462 send_full_set (op);
@@ -1043,7 +1467,7 @@ handle_union_p2p_strata_estimator (void *cls,
1043 1467
1044 LOG (GNUNET_ERROR_TYPE_DEBUG, 1468 LOG (GNUNET_ERROR_TYPE_DEBUG,
1045 "Telling other peer that we expect its full set\n"); 1469 "Telling other peer that we expect its full set\n");
1046 op->state->phase = PHASE_EXPECT_IBF; 1470 op->phase = PHASE_EXPECT_IBF;
1047 ev = GNUNET_MQ_msg_header ( 1471 ev = GNUNET_MQ_msg_header (
1048 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); 1472 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
1049 GNUNET_MQ_send (op->mq, 1473 GNUNET_MQ_send (op->mq,
@@ -1123,7 +1547,7 @@ send_offers_for_key (struct Operation *op,
1123 send_cls.ibf_key = ibf_key; 1547 send_cls.ibf_key = ibf_key;
1124 send_cls.op = op; 1548 send_cls.op = op;
1125 (void) GNUNET_CONTAINER_multihashmap32_get_multiple ( 1549 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
1126 op->state->key_to_element, 1550 op->key_to_element,
1127 (uint32_t) ibf_key. 1551 (uint32_t) ibf_key.
1128 key_val, 1552 key_val,
1129 &send_offers_iterator, 1553 &send_offers_iterator,
@@ -1147,22 +1571,22 @@ decode_and_send (struct Operation *op)
1147 unsigned int num_decoded; 1571 unsigned int num_decoded;
1148 struct InvertibleBloomFilter *diff_ibf; 1572 struct InvertibleBloomFilter *diff_ibf;
1149 1573
1150 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); 1574 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->phase);
1151 1575
1152 if (GNUNET_OK != 1576 if (GNUNET_OK !=
1153 prepare_ibf (op, 1577 prepare_ibf (op,
1154 op->state->remote_ibf->size)) 1578 op->remote_ibf->size))
1155 { 1579 {
1156 GNUNET_break (0); 1580 GNUNET_break (0);
1157 /* allocation failed */ 1581 /* allocation failed */
1158 return GNUNET_SYSERR; 1582 return GNUNET_SYSERR;
1159 } 1583 }
1160 diff_ibf = ibf_dup (op->state->local_ibf); 1584 diff_ibf = ibf_dup (op->local_ibf);
1161 ibf_subtract (diff_ibf, 1585 ibf_subtract (diff_ibf,
1162 op->state->remote_ibf); 1586 op->remote_ibf);
1163 1587
1164 ibf_destroy (op->state->remote_ibf); 1588 ibf_destroy (op->remote_ibf);
1165 op->state->remote_ibf = NULL; 1589 op->remote_ibf = NULL;
1166 1590
1167 LOG (GNUNET_ERROR_TYPE_DEBUG, 1591 LOG (GNUNET_ERROR_TYPE_DEBUG,
1168 "decoding IBF (size=%u)\n", 1592 "decoding IBF (size=%u)\n",
@@ -1213,7 +1637,7 @@ decode_and_send (struct Operation *op)
1213 "# of IBF retries", 1637 "# of IBF retries",
1214 1, 1638 1,
1215 GNUNET_NO); 1639 GNUNET_NO);
1216 op->state->salt_send++; 1640 op->salt_send++;
1217 if (GNUNET_OK != 1641 if (GNUNET_OK !=
1218 send_ibf (op, next_order)) 1642 send_ibf (op, next_order))
1219 { 1643 {
@@ -1258,7 +1682,7 @@ decode_and_send (struct Operation *op)
1258 struct IBF_Key unsalted_key; 1682 struct IBF_Key unsalted_key;
1259 1683
1260 unsalt_key (&key, 1684 unsalt_key (&key,
1261 op->state->salt_receive, 1685 op->salt_receive,
1262 &unsalted_key); 1686 &unsalted_key);
1263 send_offers_for_key (op, 1687 send_offers_for_key (op,
1264 unsalted_key); 1688 unsalted_key);
@@ -1273,7 +1697,7 @@ decode_and_send (struct Operation *op)
1273 ev = GNUNET_MQ_msg_extra (msg, 1697 ev = GNUNET_MQ_msg_extra (msg,
1274 sizeof(struct IBF_Key), 1698 sizeof(struct IBF_Key),
1275 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY); 1699 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY);
1276 msg->salt = htonl (op->state->salt_receive); 1700 msg->salt = htonl (op->salt_receive);
1277 GNUNET_memcpy (&msg[1], 1701 GNUNET_memcpy (&msg[1],
1278 &key, 1702 &key,
1279 sizeof(struct IBF_Key)); 1703 sizeof(struct IBF_Key));
@@ -1322,26 +1746,26 @@ check_union_p2p_ibf (void *cls,
1322 GNUNET_break_op (0); 1746 GNUNET_break_op (0);
1323 return GNUNET_SYSERR; 1747 return GNUNET_SYSERR;
1324 } 1748 }
1325 if (op->state->phase == PHASE_EXPECT_IBF_CONT) 1749 if (op->phase == PHASE_EXPECT_IBF_CONT)
1326 { 1750 {
1327 if (ntohl (msg->offset) != op->state->ibf_buckets_received) 1751 if (ntohl (msg->offset) != op->ibf_buckets_received)
1328 { 1752 {
1329 GNUNET_break_op (0); 1753 GNUNET_break_op (0);
1330 return GNUNET_SYSERR; 1754 return GNUNET_SYSERR;
1331 } 1755 }
1332 if (1 << msg->order != op->state->remote_ibf->size) 1756 if (1 << msg->order != op->remote_ibf->size)
1333 { 1757 {
1334 GNUNET_break_op (0); 1758 GNUNET_break_op (0);
1335 return GNUNET_SYSERR; 1759 return GNUNET_SYSERR;
1336 } 1760 }
1337 if (ntohl (msg->salt) != op->state->salt_receive) 1761 if (ntohl (msg->salt) != op->salt_receive)
1338 { 1762 {
1339 GNUNET_break_op (0); 1763 GNUNET_break_op (0);
1340 return GNUNET_SYSERR; 1764 return GNUNET_SYSERR;
1341 } 1765 }
1342 } 1766 }
1343 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && 1767 else if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
1344 (op->state->phase != PHASE_EXPECT_IBF)) 1768 (op->phase != PHASE_EXPECT_IBF))
1345 { 1769 {
1346 GNUNET_break_op (0); 1770 GNUNET_break_op (0);
1347 return GNUNET_SYSERR; 1771 return GNUNET_SYSERR;
@@ -1369,27 +1793,27 @@ handle_union_p2p_ibf (void *cls,
1369 1793
1370 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) 1794 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1371 / IBF_BUCKET_SIZE; 1795 / IBF_BUCKET_SIZE;
1372 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) || 1796 if ((op->phase == PHASE_INVENTORY_PASSIVE) ||
1373 (op->state->phase == PHASE_EXPECT_IBF)) 1797 (op->phase == PHASE_EXPECT_IBF))
1374 { 1798 {
1375 op->state->phase = PHASE_EXPECT_IBF_CONT; 1799 op->phase = PHASE_EXPECT_IBF_CONT;
1376 GNUNET_assert (NULL == op->state->remote_ibf); 1800 GNUNET_assert (NULL == op->remote_ibf);
1377 LOG (GNUNET_ERROR_TYPE_DEBUG, 1801 LOG (GNUNET_ERROR_TYPE_DEBUG,
1378 "Creating new ibf of size %u\n", 1802 "Creating new ibf of size %u\n",
1379 1 << msg->order); 1803 1 << msg->order);
1380 op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); 1804 op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1381 op->state->salt_receive = ntohl (msg->salt); 1805 op->salt_receive = ntohl (msg->salt);
1382 LOG (GNUNET_ERROR_TYPE_DEBUG, 1806 LOG (GNUNET_ERROR_TYPE_DEBUG,
1383 "Receiving new IBF with salt %u\n", 1807 "Receiving new IBF with salt %u\n",
1384 op->state->salt_receive); 1808 op->salt_receive);
1385 if (NULL == op->state->remote_ibf) 1809 if (NULL == op->remote_ibf)
1386 { 1810 {
1387 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1811 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1388 "Failed to parse remote IBF, closing connection\n"); 1812 "Failed to parse remote IBF, closing connection\n");
1389 fail_union_operation (op); 1813 fail_union_operation (op);
1390 return; 1814 return;
1391 } 1815 }
1392 op->state->ibf_buckets_received = 0; 1816 op->ibf_buckets_received = 0;
1393 if (0 != ntohl (msg->offset)) 1817 if (0 != ntohl (msg->offset))
1394 { 1818 {
1395 GNUNET_break_op (0); 1819 GNUNET_break_op (0);
@@ -1399,23 +1823,23 @@ handle_union_p2p_ibf (void *cls,
1399 } 1823 }
1400 else 1824 else
1401 { 1825 {
1402 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); 1826 GNUNET_assert (op->phase == PHASE_EXPECT_IBF_CONT);
1403 LOG (GNUNET_ERROR_TYPE_DEBUG, 1827 LOG (GNUNET_ERROR_TYPE_DEBUG,
1404 "Received more of IBF\n"); 1828 "Received more of IBF\n");
1405 } 1829 }
1406 GNUNET_assert (NULL != op->state->remote_ibf); 1830 GNUNET_assert (NULL != op->remote_ibf);
1407 1831
1408 ibf_read_slice (&msg[1], 1832 ibf_read_slice (&msg[1],
1409 op->state->ibf_buckets_received, 1833 op->ibf_buckets_received,
1410 buckets_in_message, 1834 buckets_in_message,
1411 op->state->remote_ibf); 1835 op->remote_ibf);
1412 op->state->ibf_buckets_received += buckets_in_message; 1836 op->ibf_buckets_received += buckets_in_message;
1413 1837
1414 if (op->state->ibf_buckets_received == op->state->remote_ibf->size) 1838 if (op->ibf_buckets_received == op->remote_ibf->size)
1415 { 1839 {
1416 LOG (GNUNET_ERROR_TYPE_DEBUG, 1840 LOG (GNUNET_ERROR_TYPE_DEBUG,
1417 "received full ibf\n"); 1841 "received full ibf\n");
1418 op->state->phase = PHASE_INVENTORY_ACTIVE; 1842 op->phase = PHASE_INVENTORY_ACTIVE;
1419 if (GNUNET_OK != 1843 if (GNUNET_OK !=
1420 decode_and_send (op)) 1844 decode_and_send (op))
1421 { 1845 {
@@ -1463,7 +1887,7 @@ send_client_element (struct Operation *op,
1463 rm->request_id = htonl (op->client_request_id); 1887 rm->request_id = htonl (op->client_request_id);
1464 rm->element_type = htons (element->element_type); 1888 rm->element_type = htons (element->element_type);
1465 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size ( 1889 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1466 op->state->key_to_element)); 1890 op->key_to_element));
1467 GNUNET_memcpy (&rm[1], 1891 GNUNET_memcpy (&rm[1],
1468 element->data, 1892 element->data,
1469 element->size); 1893 element->size);
@@ -1473,61 +1897,6 @@ send_client_element (struct Operation *op,
1473 1897
1474 1898
1475/** 1899/**
1476 * Signal to the client that the operation has finished and
1477 * destroy the operation.
1478 *
1479 * @param cls operation to destroy
1480 */
1481static void
1482send_client_done (void *cls)
1483{
1484 struct Operation *op = cls;
1485 struct GNUNET_MQ_Envelope *ev;
1486 struct GNUNET_SETU_ResultMessage *rm;
1487
1488 if (GNUNET_YES == op->state->client_done_sent)
1489 {
1490 return;
1491 }
1492
1493 if (PHASE_DONE != op->state->phase)
1494 {
1495 LOG (GNUNET_ERROR_TYPE_WARNING,
1496 "Union operation failed\n");
1497 GNUNET_STATISTICS_update (_GSS_statistics,
1498 "# Union operations failed",
1499 1,
1500 GNUNET_NO);
1501 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SETU_RESULT);
1502 rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1503 rm->request_id = htonl (op->client_request_id);
1504 rm->element_type = htons (0);
1505 GNUNET_MQ_send (op->set->cs->mq,
1506 ev);
1507 return;
1508 }
1509
1510 op->state->client_done_sent = GNUNET_YES;
1511
1512 GNUNET_STATISTICS_update (_GSS_statistics,
1513 "# Union operations succeeded",
1514 1,
1515 GNUNET_NO);
1516 LOG (GNUNET_ERROR_TYPE_INFO,
1517 "Signalling client that union operation is done\n");
1518 ev = GNUNET_MQ_msg (rm,
1519 GNUNET_MESSAGE_TYPE_SETU_RESULT);
1520 rm->request_id = htonl (op->client_request_id);
1521 rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
1522 rm->element_type = htons (0);
1523 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1524 op->state->key_to_element));
1525 GNUNET_MQ_send (op->set->cs->mq,
1526 ev);
1527}
1528
1529
1530/**
1531 * Tests if the operation is finished, and if so notify. 1900 * Tests if the operation is finished, and if so notify.
1532 * 1901 *
1533 * @param op operation to check 1902 * @param op operation to check
@@ -1538,9 +1907,9 @@ maybe_finish (struct Operation *op)
1538 unsigned int num_demanded; 1907 unsigned int num_demanded;
1539 1908
1540 num_demanded = GNUNET_CONTAINER_multihashmap_size ( 1909 num_demanded = GNUNET_CONTAINER_multihashmap_size (
1541 op->state->demanded_hashes); 1910 op->demanded_hashes);
1542 1911
1543 if (PHASE_FINISH_WAITING == op->state->phase) 1912 if (PHASE_FINISH_WAITING == op->phase)
1544 { 1913 {
1545 LOG (GNUNET_ERROR_TYPE_DEBUG, 1914 LOG (GNUNET_ERROR_TYPE_DEBUG,
1546 "In PHASE_FINISH_WAITING, pending %u demands\n", 1915 "In PHASE_FINISH_WAITING, pending %u demands\n",
@@ -1549,7 +1918,7 @@ maybe_finish (struct Operation *op)
1549 { 1918 {
1550 struct GNUNET_MQ_Envelope *ev; 1919 struct GNUNET_MQ_Envelope *ev;
1551 1920
1552 op->state->phase = PHASE_DONE; 1921 op->phase = PHASE_DONE;
1553 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 1922 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1554 GNUNET_MQ_send (op->mq, 1923 GNUNET_MQ_send (op->mq,
1555 ev); 1924 ev);
@@ -1557,14 +1926,14 @@ maybe_finish (struct Operation *op)
1557 * after it got all elements from us. */ 1926 * after it got all elements from us. */
1558 } 1927 }
1559 } 1928 }
1560 if (PHASE_FINISH_CLOSING == op->state->phase) 1929 if (PHASE_FINISH_CLOSING == op->phase)
1561 { 1930 {
1562 LOG (GNUNET_ERROR_TYPE_DEBUG, 1931 LOG (GNUNET_ERROR_TYPE_DEBUG,
1563 "In PHASE_FINISH_CLOSING, pending %u demands\n", 1932 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1564 num_demanded); 1933 num_demanded);
1565 if (0 == num_demanded) 1934 if (0 == num_demanded)
1566 { 1935 {
1567 op->state->phase = PHASE_DONE; 1936 op->phase = PHASE_DONE;
1568 send_client_done (op); 1937 send_client_done (op);
1569 _GSS_operation_destroy2 (op); 1938 _GSS_operation_destroy2 (op);
1570 } 1939 }
@@ -1584,7 +1953,7 @@ check_union_p2p_elements (void *cls,
1584{ 1953{
1585 struct Operation *op = cls; 1954 struct Operation *op = cls;
1586 1955
1587 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) 1956 if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
1588 { 1957 {
1589 GNUNET_break_op (0); 1958 GNUNET_break_op (0);
1590 return GNUNET_SYSERR; 1959 return GNUNET_SYSERR;
@@ -1623,7 +1992,7 @@ handle_union_p2p_elements (void *cls,
1623 GNUNET_SETU_element_hash (&ee->element, 1992 GNUNET_SETU_element_hash (&ee->element,
1624 &ee->element_hash); 1993 &ee->element_hash);
1625 if (GNUNET_NO == 1994 if (GNUNET_NO ==
1626 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, 1995 GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
1627 &ee->element_hash, 1996 &ee->element_hash,
1628 NULL)) 1997 NULL))
1629 { 1998 {
@@ -1647,7 +2016,7 @@ handle_union_p2p_elements (void *cls,
1647 1, 2016 1,
1648 GNUNET_NO); 2017 GNUNET_NO);
1649 2018
1650 op->state->received_total++; 2019 op->received_total++;
1651 2020
1652 ke = op_get_element (op, 2021 ke = op_get_element (op,
1653 &ee->element_hash); 2022 &ee->element_hash);
@@ -1666,7 +2035,7 @@ handle_union_p2p_elements (void *cls,
1666 { 2035 {
1667 LOG (GNUNET_ERROR_TYPE_DEBUG, 2036 LOG (GNUNET_ERROR_TYPE_DEBUG,
1668 "Registering new element from remote peer\n"); 2037 "Registering new element from remote peer\n");
1669 op->state->received_fresh++; 2038 op->received_fresh++;
1670 op_register_element (op, ee, GNUNET_YES); 2039 op_register_element (op, ee, GNUNET_YES);
1671 /* only send results immediately if the client wants it */ 2040 /* only send results immediately if the client wants it */
1672 send_client_element (op, 2041 send_client_element (op,
@@ -1674,8 +2043,8 @@ handle_union_p2p_elements (void *cls,
1674 GNUNET_SETU_STATUS_ADD_LOCAL); 2043 GNUNET_SETU_STATUS_ADD_LOCAL);
1675 } 2044 }
1676 2045
1677 if ((op->state->received_total > 8) && 2046 if ((op->received_total > 8) &&
1678 (op->state->received_fresh < op->state->received_total / 3)) 2047 (op->received_fresh < op->received_total / 3))
1679 { 2048 {
1680 /* The other peer gave us lots of old elements, there's something wrong. */ 2049 /* The other peer gave us lots of old elements, there's something wrong. */
1681 GNUNET_break_op (0); 2050 GNUNET_break_op (0);
@@ -1744,7 +2113,7 @@ handle_union_p2p_full_element (void *cls,
1744 1, 2113 1,
1745 GNUNET_NO); 2114 GNUNET_NO);
1746 2115
1747 op->state->received_total++; 2116 op->received_total++;
1748 2117
1749 ke = op_get_element (op, 2118 ke = op_get_element (op,
1750 &ee->element_hash); 2119 &ee->element_hash);
@@ -1763,7 +2132,7 @@ handle_union_p2p_full_element (void *cls,
1763 { 2132 {
1764 LOG (GNUNET_ERROR_TYPE_DEBUG, 2133 LOG (GNUNET_ERROR_TYPE_DEBUG,
1765 "Registering new element from remote peer\n"); 2134 "Registering new element from remote peer\n");
1766 op->state->received_fresh++; 2135 op->received_fresh++;
1767 op_register_element (op, ee, GNUNET_YES); 2136 op_register_element (op, ee, GNUNET_YES);
1768 /* only send results immediately if the client wants it */ 2137 /* only send results immediately if the client wants it */
1769 send_client_element (op, 2138 send_client_element (op,
@@ -1772,14 +2141,14 @@ handle_union_p2p_full_element (void *cls,
1772 } 2141 }
1773 2142
1774 if ((GNUNET_YES == op->byzantine) && 2143 if ((GNUNET_YES == op->byzantine) &&
1775 (op->state->received_total > 384 + op->state->received_fresh * 4) && 2144 (op->received_total > 384 + op->received_fresh * 4) &&
1776 (op->state->received_fresh < op->state->received_total / 6)) 2145 (op->received_fresh < op->received_total / 6))
1777 { 2146 {
1778 /* The other peer gave us lots of old elements, there's something wrong. */ 2147 /* The other peer gave us lots of old elements, there's something wrong. */
1779 LOG (GNUNET_ERROR_TYPE_ERROR, 2148 LOG (GNUNET_ERROR_TYPE_ERROR,
1780 "Other peer sent only %llu/%llu fresh elements, failing operation\n", 2149 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1781 (unsigned long long) op->state->received_fresh, 2150 (unsigned long long) op->received_fresh,
1782 (unsigned long long) op->state->received_total); 2151 (unsigned long long) op->received_total);
1783 GNUNET_break_op (0); 2152 GNUNET_break_op (0);
1784 fail_union_operation (op); 2153 fail_union_operation (op);
1785 return; 2154 return;
@@ -1802,7 +2171,7 @@ check_union_p2p_inquiry (void *cls,
1802 struct Operation *op = cls; 2171 struct Operation *op = cls;
1803 unsigned int num_keys; 2172 unsigned int num_keys;
1804 2173
1805 if (op->state->phase != PHASE_INVENTORY_PASSIVE) 2174 if (op->phase != PHASE_INVENTORY_PASSIVE)
1806 { 2175 {
1807 GNUNET_break_op (0); 2176 GNUNET_break_op (0);
1808 return GNUNET_SYSERR; 2177 return GNUNET_SYSERR;
@@ -1903,7 +2272,7 @@ handle_union_p2p_request_full (void *cls,
1903 2272
1904 LOG (GNUNET_ERROR_TYPE_DEBUG, 2273 LOG (GNUNET_ERROR_TYPE_DEBUG,
1905 "Received request for full set transmission\n"); 2274 "Received request for full set transmission\n");
1906 if (PHASE_EXPECT_IBF != op->state->phase) 2275 if (PHASE_EXPECT_IBF != op->phase)
1907 { 2276 {
1908 GNUNET_break_op (0); 2277 GNUNET_break_op (0);
1909 fail_union_operation (op); 2278 fail_union_operation (op);
@@ -1929,7 +2298,7 @@ handle_union_p2p_full_done (void *cls,
1929{ 2298{
1930 struct Operation *op = cls; 2299 struct Operation *op = cls;
1931 2300
1932 switch (op->state->phase) 2301 switch (op->phase)
1933 { 2302 {
1934 case PHASE_EXPECT_IBF: 2303 case PHASE_EXPECT_IBF:
1935 { 2304 {
@@ -1939,14 +2308,14 @@ handle_union_p2p_full_done (void *cls,
1939 "got FULL DONE, sending elements that other peer is missing\n"); 2308 "got FULL DONE, sending elements that other peer is missing\n");
1940 2309
1941 /* send all the elements that did not come from the remote peer */ 2310 /* send all the elements that did not come from the remote peer */
1942 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 2311 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
1943 &send_missing_full_elements_iter, 2312 &send_missing_full_elements_iter,
1944 op); 2313 op);
1945 2314
1946 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 2315 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
1947 GNUNET_MQ_send (op->mq, 2316 GNUNET_MQ_send (op->mq,
1948 ev); 2317 ev);
1949 op->state->phase = PHASE_DONE; 2318 op->phase = PHASE_DONE;
1950 /* we now wait until the other peer sends us the OVER message*/ 2319 /* we now wait until the other peer sends us the OVER message*/
1951 } 2320 }
1952 break; 2321 break;
@@ -1956,7 +2325,7 @@ handle_union_p2p_full_done (void *cls,
1956 LOG (GNUNET_ERROR_TYPE_DEBUG, 2325 LOG (GNUNET_ERROR_TYPE_DEBUG,
1957 "got FULL DONE, finishing\n"); 2326 "got FULL DONE, finishing\n");
1958 /* We sent the full set, and got the response for that. We're done. */ 2327 /* We sent the full set, and got the response for that. We're done. */
1959 op->state->phase = PHASE_DONE; 2328 op->phase = PHASE_DONE;
1960 GNUNET_CADET_receive_done (op->channel); 2329 GNUNET_CADET_receive_done (op->channel);
1961 send_client_done (op); 2330 send_client_done (op);
1962 _GSS_operation_destroy2 (op); 2331 _GSS_operation_destroy2 (op);
@@ -1967,7 +2336,7 @@ handle_union_p2p_full_done (void *cls,
1967 default: 2336 default:
1968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1969 "Handle full done phase is %u\n", 2338 "Handle full done phase is %u\n",
1970 (unsigned) op->state->phase); 2339 (unsigned) op->phase);
1971 GNUNET_break_op (0); 2340 GNUNET_break_op (0);
1972 fail_union_operation (op); 2341 fail_union_operation (op);
1973 return; 2342 return;
@@ -2079,8 +2448,8 @@ check_union_p2p_offer (void *cls,
2079 unsigned int num_hashes; 2448 unsigned int num_hashes;
2080 2449
2081 /* look up elements and send them */ 2450 /* look up elements and send them */
2082 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && 2451 if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
2083 (op->state->phase != PHASE_INVENTORY_ACTIVE)) 2452 (op->phase != PHASE_INVENTORY_ACTIVE))
2084 { 2453 {
2085 GNUNET_break_op (0); 2454 GNUNET_break_op (0);
2086 return GNUNET_SYSERR; 2455 return GNUNET_SYSERR;
@@ -2129,7 +2498,7 @@ handle_union_p2p_offer (void *cls,
2129 continue; 2498 continue;
2130 2499
2131 if (GNUNET_YES == 2500 if (GNUNET_YES ==
2132 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, 2501 GNUNET_CONTAINER_multihashmap_contains (op->demanded_hashes,
2133 hash)) 2502 hash))
2134 { 2503 {
2135 LOG (GNUNET_ERROR_TYPE_DEBUG, 2504 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2139,7 +2508,7 @@ handle_union_p2p_offer (void *cls,
2139 2508
2140 GNUNET_assert (GNUNET_OK == 2509 GNUNET_assert (GNUNET_OK ==
2141 GNUNET_CONTAINER_multihashmap_put ( 2510 GNUNET_CONTAINER_multihashmap_put (
2142 op->state->demanded_hashes, 2511 op->demanded_hashes,
2143 hash, 2512 hash,
2144 NULL, 2513 NULL,
2145 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); 2514 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
@@ -2171,12 +2540,11 @@ handle_union_p2p_done (void *cls,
2171{ 2540{
2172 struct Operation *op = cls; 2541 struct Operation *op = cls;
2173 2542
2174 switch (op->state->phase) 2543 switch (op->phase)
2175 { 2544 {
2176 case PHASE_INVENTORY_PASSIVE: 2545 case PHASE_INVENTORY_PASSIVE:
2177 /* We got all requests, but still have to send our elements in response. */ 2546 /* We got all requests, but still have to send our elements in response. */
2178 op->state->phase = PHASE_FINISH_WAITING; 2547 op->phase = PHASE_FINISH_WAITING;
2179
2180 LOG (GNUNET_ERROR_TYPE_DEBUG, 2548 LOG (GNUNET_ERROR_TYPE_DEBUG,
2181 "got DONE (as passive partner), waiting for our demands to be satisfied\n"); 2549 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2182 /* The active peer is done sending offers 2550 /* The active peer is done sending offers
@@ -2190,7 +2558,6 @@ handle_union_p2p_done (void *cls,
2190 */GNUNET_CADET_receive_done (op->channel); 2558 */GNUNET_CADET_receive_done (op->channel);
2191 maybe_finish (op); 2559 maybe_finish (op);
2192 return; 2560 return;
2193
2194 case PHASE_INVENTORY_ACTIVE: 2561 case PHASE_INVENTORY_ACTIVE:
2195 LOG (GNUNET_ERROR_TYPE_DEBUG, 2562 LOG (GNUNET_ERROR_TYPE_DEBUG,
2196 "got DONE (as active partner), waiting to finish\n"); 2563 "got DONE (as active partner), waiting to finish\n");
@@ -2200,11 +2567,10 @@ handle_union_p2p_done (void *cls,
2200 * 2567 *
2201 * We'll close the channel 2568 * We'll close the channel
2202 * to the other peer once our demands are met. 2569 * to the other peer once our demands are met.
2203 */op->state->phase = PHASE_FINISH_CLOSING; 2570 */op->phase = PHASE_FINISH_CLOSING;
2204 GNUNET_CADET_receive_done (op->channel); 2571 GNUNET_CADET_receive_done (op->channel);
2205 maybe_finish (op); 2572 maybe_finish (op);
2206 return; 2573 return;
2207
2208 default: 2574 default:
2209 GNUNET_break_op (0); 2575 GNUNET_break_op (0);
2210 fail_union_operation (op); 2576 fail_union_operation (op);
@@ -2230,15 +2596,15 @@ handle_union_p2p_over (void *cls,
2230/** 2596/**
2231 * Initiate operation to evaluate a set union with a remote peer. 2597 * Initiate operation to evaluate a set union with a remote peer.
2232 * 2598 *
2233 * @param op operation to perform (to be initialized) 2599 * @param[in,out] op operation to perform (to be initialized)
2234 * @param opaque_context message to be transmitted to the listener 2600 * @param opaque_context message to be transmitted to the listener
2235 * to convince it to accept, may be NULL 2601 * to convince it to accept, may be NULL
2602 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
2236 */ 2603 */
2237static struct OperationState * 2604static int
2238union_evaluate (struct Operation *op, 2605union_evaluate (struct Operation *op,
2239 const struct GNUNET_MessageHeader *opaque_context) 2606 const struct GNUNET_MessageHeader *opaque_context)
2240{ 2607{
2241 struct OperationState *state;
2242 struct GNUNET_MQ_Envelope *ev; 2608 struct GNUNET_MQ_Envelope *ev;
2243 struct OperationRequestMessage *msg; 2609 struct OperationRequestMessage *msg;
2244 2610
@@ -2249,16 +2615,15 @@ union_evaluate (struct Operation *op,
2249 { 2615 {
2250 /* the context message is too large */ 2616 /* the context message is too large */
2251 GNUNET_break (0); 2617 GNUNET_break (0);
2252 return NULL; 2618 return GNUNET_SYSERR;
2253 } 2619 }
2254 state = GNUNET_new (struct OperationState); 2620 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2255 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 2621 GNUNET_NO);
2256 GNUNET_NO);
2257 /* copy the current generation's strata estimator for this operation */ 2622 /* copy the current generation's strata estimator for this operation */
2258 state->se = strata_estimator_dup (op->set->state->se); 2623 op->se = strata_estimator_dup (op->set->se);
2259 /* we started the operation, thus we have to send the operation request */ 2624 /* we started the operation, thus we have to send the operation request */
2260 state->phase = PHASE_EXPECT_SE; 2625 op->phase = PHASE_EXPECT_SE;
2261 state->salt_receive = state->salt_send = 42; // FIXME????? 2626 op->salt_receive = op->salt_send = 42; // FIXME?????
2262 LOG (GNUNET_ERROR_TYPE_DEBUG, 2627 LOG (GNUNET_ERROR_TYPE_DEBUG,
2263 "Initiating union operation evaluation\n"); 2628 "Initiating union operation evaluation\n");
2264 GNUNET_STATISTICS_update (_GSS_statistics, 2629 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -2278,12 +2643,9 @@ union_evaluate (struct Operation *op,
2278 else 2643 else
2279 LOG (GNUNET_ERROR_TYPE_DEBUG, 2644 LOG (GNUNET_ERROR_TYPE_DEBUG,
2280 "sent op request without context message\n"); 2645 "sent op request without context message\n");
2281
2282 op->state = state;
2283 initialize_key_to_element (op); 2646 initialize_key_to_element (op);
2284 state->initial_size = GNUNET_CONTAINER_multihashmap32_size ( 2647 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->key_to_element);
2285 state->key_to_element); 2648 return GNUNET_OK;
2286 return state;
2287} 2649}
2288 2650
2289 2651
@@ -2313,100 +2675,6 @@ get_incoming (uint32_t id)
2313 2675
2314 2676
2315/** 2677/**
2316 * Destroy an incoming request from a remote peer
2317 *
2318 * @param op remote request to destroy
2319 */
2320static void
2321incoming_destroy (struct Operation *op)
2322{
2323 struct Listener *listener;
2324
2325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2326 "Destroying incoming operation %p\n",
2327 op);
2328 if (NULL != (listener = op->listener))
2329 {
2330 GNUNET_CONTAINER_DLL_remove (listener->op_head,
2331 listener->op_tail,
2332 op);
2333 op->listener = NULL;
2334 }
2335 if (NULL != op->timeout_task)
2336 {
2337 GNUNET_SCHEDULER_cancel (op->timeout_task);
2338 op->timeout_task = NULL;
2339 }
2340 _GSS_operation_destroy2 (op);
2341}
2342
2343
2344/**
2345 * Is element @a ee part of the set used by @a op?
2346 *
2347 * @param ee element to test
2348 * @param op operation the defines the set and its generation
2349 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
2350 */
2351int
2352_GSS_is_element_of_operation (struct ElementEntry *ee,
2353 struct Operation *op)
2354{
2355 return ee->generation >= op->generation_created;
2356}
2357
2358
2359/**
2360 * Destroy the given operation. Used for any operation where both
2361 * peers were known and that thus actually had a vt and channel. Must
2362 * not be used for operations where 'listener' is still set and we do
2363 * not know the other peer.
2364 *
2365 * Call the implementation-specific cancel function of the operation.
2366 * Disconnects from the remote peer. Does not disconnect the client,
2367 * as there may be multiple operations per set.
2368 *
2369 * @param op operation to destroy
2370 */
2371void
2372_GSS_operation_destroy (struct Operation *op)
2373{
2374 struct Set *set = op->set;
2375 struct GNUNET_CADET_Channel *channel;
2376
2377 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2378 "Destroying operation %p\n", op);
2379 GNUNET_assert (NULL == op->listener);
2380 if (NULL != op->state)
2381 {
2382 union_op_cancel (op);
2383 op->state = NULL;
2384 }
2385 if (NULL != set)
2386 {
2387 GNUNET_CONTAINER_DLL_remove (set->ops_head,
2388 set->ops_tail,
2389 op);
2390 op->set = NULL;
2391 }
2392 if (NULL != op->context_msg)
2393 {
2394 GNUNET_free (op->context_msg);
2395 op->context_msg = NULL;
2396 }
2397 if (NULL != (channel = op->channel))
2398 {
2399 /* This will free op; called conditionally as this helper function
2400 is also called from within the channel disconnect handler. */
2401 op->channel = NULL;
2402 GNUNET_CADET_channel_destroy (channel);
2403 }
2404 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
2405 * there was a channel end handler that will free 'op' on the call stack. */
2406}
2407
2408
2409/**
2410 * Callback called when a client connects to the service. 2678 * Callback called when a client connects to the service.
2411 * 2679 *
2412 * @param cls closure for the service 2680 * @param cls closure for the service
@@ -2479,14 +2747,11 @@ client_disconnect_cb (void *cls,
2479 _GSS_operation_destroy (set->ops_head); 2747 _GSS_operation_destroy (set->ops_head);
2480 2748
2481 /* Destroy operation-specific state */ 2749 /* Destroy operation-specific state */
2482 GNUNET_assert (NULL != set->state); 2750 if (NULL != set->se)
2483 if (NULL != set->state->se)
2484 { 2751 {
2485 strata_estimator_destroy (set->state->se); 2752 strata_estimator_destroy (set->se);
2486 set->state->se = NULL; 2753 set->se = NULL;
2487 } 2754 }
2488 GNUNET_free (set->state);
2489
2490 /* free set content (or at least decrement RC) */ 2755 /* free set content (or at least decrement RC) */
2491 set->content = NULL; 2756 set->content = NULL;
2492 GNUNET_assert (0 != content->refcount); 2757 GNUNET_assert (0 != content->refcount);
@@ -2665,12 +2930,12 @@ handle_client_create_set (void *cls,
2665 } 2930 }
2666 set = GNUNET_new (struct Set); 2931 set = GNUNET_new (struct Set);
2667 { 2932 {
2668 struct SetState *set_state; 2933 struct StrataEstimator *se;
2669 2934
2670 set_state = GNUNET_new (struct SetState); // FIXME: avoid this malloc, merge structs! 2935 se = strata_estimator_create (SE_STRATA_COUNT,
2671 set_state->se = strata_estimator_create (SE_STRATA_COUNT, 2936 SE_IBF_SIZE,
2672 SE_IBF_SIZE, SE_IBF_HASH_NUM); 2937 SE_IBF_HASH_NUM);
2673 if (NULL == set_state->se) 2938 if (NULL == se)
2674 { 2939 {
2675 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2940 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2676 "Failed to allocate strata estimator\n"); 2941 "Failed to allocate strata estimator\n");
@@ -2678,7 +2943,7 @@ handle_client_create_set (void *cls,
2678 GNUNET_SERVICE_client_drop (cs->client); 2943 GNUNET_SERVICE_client_drop (cs->client);
2679 return; 2944 return;
2680 } 2945 }
2681 set->state = set_state; 2946 set->se = se;
2682 } 2947 }
2683 set->content = GNUNET_new (struct SetContent); 2948 set->content = GNUNET_new (struct SetContent);
2684 set->content->refcount = 1; 2949 set->content->refcount = 1;
@@ -2782,37 +3047,6 @@ channel_end_cb (void *channel_ctx,
2782 3047
2783 3048
2784/** 3049/**
2785 * This function probably should not exist
2786 * and be replaced by inlining more specific
2787 * logic in the various places where it is called.
2788 */
2789void
2790_GSS_operation_destroy2 (struct Operation *op)
2791{
2792 struct GNUNET_CADET_Channel *channel;
2793
2794 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2795 "channel_end_cb called\n");
2796 if (NULL != (channel = op->channel))
2797 {
2798 /* This will free op; called conditionally as this helper function
2799 is also called from within the channel disconnect handler. */
2800 op->channel = NULL;
2801 GNUNET_CADET_channel_destroy (channel);
2802 }
2803 if (NULL != op->listener)
2804 {
2805 incoming_destroy (op);
2806 return;
2807 }
2808 if (NULL != op->set)
2809 send_client_done (op);
2810 _GSS_operation_destroy (op);
2811 GNUNET_free (op);
2812}
2813
2814
2815/**
2816 * Function called whenever an MQ-channel's transmission window size changes. 3050 * Function called whenever an MQ-channel's transmission window size changes.
2817 * 3051 *
2818 * The first callback in an outgoing channel will be with a non-zero value 3052 * The first callback in an outgoing channel will be with a non-zero value
@@ -3042,7 +3276,7 @@ handle_client_set_add (void *cls,
3042 /* same element inserted twice */ 3276 /* same element inserted twice */
3043 return; 3277 return;
3044 } 3278 }
3045 strata_estimator_insert (set->state->se, 3279 strata_estimator_insert (set->se,
3046 get_ibf_key (&ee->element_hash)); 3280 get_ibf_key (&ee->element_hash));
3047} 3281}
3048 3282
@@ -3188,8 +3422,10 @@ handle_client_evaluate (void *cls,
3188 &channel_end_cb, 3422 &channel_end_cb,
3189 cadet_handlers); 3423 cadet_handlers);
3190 op->mq = GNUNET_CADET_get_mq (op->channel); 3424 op->mq = GNUNET_CADET_get_mq (op->channel);
3191 op->state = union_evaluate (op, context); 3425 // FIXME: inline!
3192 if (NULL == op->state) 3426 if (GNUNET_OK !=
3427 union_evaluate (op,
3428 context))
3193 { 3429 {
3194 GNUNET_break (0); 3430 GNUNET_break (0);
3195 GNUNET_SERVICE_client_drop (cs->client); 3431 GNUNET_SERVICE_client_drop (cs->client);
@@ -3319,7 +3555,7 @@ handle_client_accept (void *cls,
3319 interfer with the running operation. */ 3555 interfer with the running operation. */
3320 op->generation_created = set->current_generation; 3556 op->generation_created = set->current_generation;
3321 advance_generation (set); 3557 advance_generation (set);
3322 GNUNET_assert (NULL == op->state); 3558 GNUNET_assert (NULL == op->se);
3323 3559
3324 LOG (GNUNET_ERROR_TYPE_DEBUG, 3560 LOG (GNUNET_ERROR_TYPE_DEBUG,
3325 "accepting set union operation\n"); 3561 "accepting set union operation\n");
@@ -3332,7 +3568,6 @@ handle_client_accept (void *cls,
3332 1, 3568 1,
3333 GNUNET_NO); 3569 GNUNET_NO);
3334 { 3570 {
3335 struct OperationState *state;
3336 const struct StrataEstimator *se; 3571 const struct StrataEstimator *se;
3337 struct GNUNET_MQ_Envelope *ev; 3572 struct GNUNET_MQ_Envelope *ev;
3338 struct StrataEstimatorMessage *strata_msg; 3573 struct StrataEstimatorMessage *strata_msg;
@@ -3340,18 +3575,16 @@ handle_client_accept (void *cls,
3340 size_t len; 3575 size_t len;
3341 uint16_t type; 3576 uint16_t type;
3342 3577
3343 state = GNUNET_new (struct OperationState); // FIXME: merge with 'op' to avoid malloc! 3578 op->se = strata_estimator_dup (op->set->se);
3344 state->se = strata_estimator_dup (op->set->state->se); 3579 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3345 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 3580 GNUNET_NO);
3346 GNUNET_NO); 3581 op->salt_receive = op->salt_send = 42; // FIXME?????
3347 state->salt_receive = state->salt_send = 42; // FIXME?????
3348 op->state = state;
3349 initialize_key_to_element (op); 3582 initialize_key_to_element (op);
3350 state->initial_size = GNUNET_CONTAINER_multihashmap32_size ( 3583 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3351 state->key_to_element); 3584 op->key_to_element);
3352 3585
3353 /* kick off the operation */ 3586 /* kick off the operation */
3354 se = state->se; 3587 se = op->se;
3355 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); 3588 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
3356 len = strata_estimator_write (se, 3589 len = strata_estimator_write (se,
3357 buf); 3590 buf);
@@ -3371,9 +3604,7 @@ handle_client_accept (void *cls,
3371 op->set->content->elements)); 3604 op->set->content->elements));
3372 GNUNET_MQ_send (op->mq, 3605 GNUNET_MQ_send (op->mq,
3373 ev); 3606 ev);
3374 state->phase = PHASE_EXPECT_IBF; 3607 op->phase = PHASE_EXPECT_IBF;
3375
3376 op->state = state;
3377 } 3608 }
3378 /* Now allow CADET to continue, as we did not do this in 3609 /* Now allow CADET to continue, as we did not do this in
3379 #handle_incoming_msg (as we wanted to first see if the 3610 #handle_incoming_msg (as we wanted to first see if the