aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-04-27 14:16:29 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-04-27 14:16:29 +0000
commit490b07064ed2f8a26ab63d2ea050f5583cccb3d0 (patch)
treee75db8657a75723760a37a765f9253a3f76e2065 /src/set
parent00196642a7a8e400a054bd0a9c3b35b24be87a78 (diff)
downloadgnunet-490b07064ed2f8a26ab63d2ea050f5583cccb3d0.tar.gz
gnunet-490b07064ed2f8a26ab63d2ea050f5583cccb3d0.zip
work on gnunet-set, isolated bug in stream
Diffstat (limited to 'src/set')
-rw-r--r--src/set/Makefile.am13
-rw-r--r--src/set/gnunet-service-set.c74
-rw-r--r--src/set/gnunet-service-set.h95
-rw-r--r--src/set/gnunet-service-set_union.c561
-rw-r--r--src/set/gnunet-set.c20
-rw-r--r--src/set/mq.c56
-rw-r--r--src/set/mq.h26
-rw-r--r--src/set/set_api.c8
8 files changed, 476 insertions, 377 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index c1639823e..a4c4fa6be 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -16,7 +16,7 @@ if USE_COVERAGE
16endif 16endif
17 17
18bin_PROGRAMS = \ 18bin_PROGRAMS = \
19 gnunet-set 19 gnunet-set gnunet-set-bug
20 20
21libexec_PROGRAMS = \ 21libexec_PROGRAMS = \
22 gnunet-service-set 22 gnunet-service-set
@@ -35,6 +35,17 @@ gnunet_set_LDADD = \
35gnunet_set_DEPENDENCIES = \ 35gnunet_set_DEPENDENCIES = \
36 libgnunetset.la 36 libgnunetset.la
37 37
38gnunet_set_bug_SOURCES = \
39 gnunet-set-bug.c \
40 mq.c
41gnunet_set_bug_LDADD = \
42 $(top_builddir)/src/util/libgnunetutil.la \
43 $(top_builddir)/src/stream/libgnunetstream.la \
44 $(GN_LIBINTL)
45# hack for mq.c, see automake Objects ‘created with both libtool and without’
46# remove once GNUNET_MQ is in util/
47gnunet_set_bug_CFLAGS = $(AM_CFLAGS)
48
38gnunet_service_set_SOURCES = \ 49gnunet_service_set_SOURCES = \
39 gnunet-service-set.c \ 50 gnunet-service-set.c \
40 gnunet-service-set_union.c \ 51 gnunet-service-set_union.c \
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index d6258aa78..314e7719d 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -84,7 +84,7 @@ static uint32_t request_id = 1;
84 * @param client the client to disconnect 84 * @param client the client to disconnect
85 */ 85 */
86void 86void
87client_disconnect (struct GNUNET_SERVER_Client *client) 87_GSS_client_disconnect (struct GNUNET_SERVER_Client *client)
88{ 88{
89 /* FIXME: clean up any data structures belonging to the client */ 89 /* FIXME: clean up any data structures belonging to the client */
90 GNUNET_SERVER_client_disconnect (client); 90 GNUNET_SERVER_client_disconnect (client);
@@ -170,6 +170,7 @@ destroy_incoming (struct Incoming *incoming)
170 * @param cls the incoming socket 170 * @param cls the incoming socket
171 * @param mh the message 171 * @param mh the message
172 */ 172 */
173
173static void 174static void
174handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) 175handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
175{ 176{
@@ -180,6 +181,8 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
180 struct Listener *listener; 181 struct Listener *listener;
181 const struct GNUNET_MessageHeader *context_msg; 182 const struct GNUNET_MessageHeader *context_msg;
182 183
184 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got operation request\n");
185
183 if (ntohs (mh->size) < sizeof *msg) 186 if (ntohs (mh->size) < sizeof *msg)
184 { 187 {
185 GNUNET_break (0); 188 GNUNET_break (0);
@@ -201,18 +204,28 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
201 return; 204 return;
202 } 205 }
203 } 206 }
204 207 /* find the appropriate listener */
205 for (listener = listeners_head; listener != NULL; listener = listener->next) 208 for (listener = listeners_head;
209 listener != NULL;
210 listener = listener->next)
206 { 211 {
207 if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) || 212 if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
208 (htons (msg->operation) != listener->operation) ) 213 (htons (msg->operation) != listener->operation) )
209 continue; 214 continue;
210 mqm = GNUNET_MQ_msg_concat (cmsg, context_msg, GNUNET_MESSAGE_TYPE_SET_REQUEST); 215 mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
216 if (GNUNET_OK !=
217 GNUNET_MQ_nest (mqm, context_msg))
218 {
219 /* FIXME: disconnect the peer */
220 GNUNET_MQ_discard (mqm);
221 GNUNET_break (0);
222 }
211 incoming->request_id = request_id++; 223 incoming->request_id = request_id++;
212 cmsg->request_id = htonl (incoming->request_id); 224 cmsg->request_id = htonl (incoming->request_id);
213 GNUNET_MQ_send (listener->client_mq, mqm); 225 GNUNET_MQ_send (listener->client_mq, mqm);
214 return; 226 return;
215 } 227 }
228 /* FIXME: send a reject message */
216} 229}
217 230
218 231
@@ -249,7 +262,7 @@ handle_client_create (void *cls,
249 GNUNET_assert (0); 262 GNUNET_assert (0);
250 break; 263 break;
251 case GNUNET_SET_OPERATION_UNION: 264 case GNUNET_SET_OPERATION_UNION:
252 set = union_set_create (); 265 set = _GSS_union_set_create ();
253 break; 266 break;
254 default: 267 default:
255 GNUNET_free (set); 268 GNUNET_free (set);
@@ -261,7 +274,7 @@ handle_client_create (void *cls,
261 set->client = client; 274 set->client = client;
262 set->client_mq = GNUNET_MQ_queue_for_server_client (client); 275 set->client_mq = GNUNET_MQ_queue_for_server_client (client);
263 GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); 276 GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
264 277
265 GNUNET_SERVER_receive_done (client, GNUNET_OK); 278 GNUNET_SERVER_receive_done (client, GNUNET_OK);
266} 279}
267 280
@@ -292,6 +305,8 @@ handle_client_listen (void *cls,
292 listener->app_id = msg->app_id; 305 listener->app_id = msg->app_id;
293 listener->operation = msg->operation; 306 listener->operation = msg->operation;
294 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); 307 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
308
309 GNUNET_SERVER_receive_done (client, GNUNET_OK);
295} 310}
296 311
297 312
@@ -313,14 +328,13 @@ handle_client_add (void *cls,
313 if (NULL == set) 328 if (NULL == set)
314 { 329 {
315 GNUNET_break (0); 330 GNUNET_break (0);
316 client_disconnect (client); 331 _GSS_client_disconnect (client);
317 return; 332 return;
318 } 333 }
319 switch (set->operation) 334 switch (set->operation)
320 { 335 {
321 case GNUNET_SET_OPERATION_UNION: 336 case GNUNET_SET_OPERATION_UNION:
322 union_add (set, (struct ElementMessage *) m); 337 _GSS_union_add ((struct ElementMessage *) m, set);
323 break;
324 case GNUNET_SET_OPERATION_INTERSECTION: 338 case GNUNET_SET_OPERATION_INTERSECTION:
325 /* FIXME: cfuchs */ 339 /* FIXME: cfuchs */
326 break; 340 break;
@@ -328,6 +342,8 @@ handle_client_add (void *cls,
328 GNUNET_assert (0); 342 GNUNET_assert (0);
329 break; 343 break;
330 } 344 }
345
346 GNUNET_SERVER_receive_done (client, GNUNET_OK);
331} 347}
332 348
333 349
@@ -344,24 +360,15 @@ handle_client_evaluate (void *cls,
344 const struct GNUNET_MessageHeader *m) 360 const struct GNUNET_MessageHeader *m)
345{ 361{
346 struct Set *set; 362 struct Set *set;
347 struct EvaluateMessage *msg = (struct EvaluateMessage *) m;
348 struct EvaluateOperation *eo;
349 363
350 set = get_set (client); 364 set = get_set (client);
351
352 if (NULL == set) 365 if (NULL == set)
353 { 366 {
354 GNUNET_break (0); 367 GNUNET_break (0);
355 client_disconnect (client); 368 _GSS_client_disconnect (client);
356 return; 369 return;
357 } 370 }
358 371
359 eo = GNUNET_new (struct EvaluateOperation);
360 eo->peer = msg->peer;
361 eo->app_id = msg->app_id;
362 eo->request_id = msg->request_id;
363 eo->context_msg = GNUNET_copy_message (&msg[1].header);
364 eo->set = set;
365 372
366 switch (set->operation) 373 switch (set->operation)
367 { 374 {
@@ -369,12 +376,14 @@ handle_client_evaluate (void *cls,
369 /* FIXME: cfuchs */ 376 /* FIXME: cfuchs */
370 break; 377 break;
371 case GNUNET_SET_OPERATION_UNION: 378 case GNUNET_SET_OPERATION_UNION:
372 union_evaluate (eo); 379 _GSS_union_evaluate ((struct EvaluateMessage *) m, set);
373 break; 380 break;
374 default: 381 default:
375 GNUNET_assert (0); 382 GNUNET_assert (0);
376 break; 383 break;
377 } 384 }
385
386 GNUNET_SERVER_receive_done (client, GNUNET_OK);
378} 387}
379 388
380 389
@@ -391,6 +400,7 @@ handle_client_cancel (void *cls,
391 const struct GNUNET_MessageHeader *m) 400 const struct GNUNET_MessageHeader *m)
392{ 401{
393 /* FIXME: implement */ 402 /* FIXME: implement */
403 GNUNET_SERVER_receive_done (client, GNUNET_OK);
394} 404}
395 405
396 406
@@ -407,6 +417,7 @@ handle_client_ack (void *cls,
407 const struct GNUNET_MessageHeader *m) 417 const struct GNUNET_MessageHeader *m)
408{ 418{
409 /* FIXME: implement */ 419 /* FIXME: implement */
420 GNUNET_SERVER_receive_done (client, GNUNET_OK);
410} 421}
411 422
412 423
@@ -421,19 +432,18 @@ handle_client_ack (void *cls,
421static void 432static void
422handle_client_accept (void *cls, 433handle_client_accept (void *cls,
423 struct GNUNET_SERVER_Client *client, 434 struct GNUNET_SERVER_Client *client,
424 const struct GNUNET_MessageHeader *m) 435 const struct GNUNET_MessageHeader *mh)
425{ 436{
426 struct AcceptMessage *msg = (struct AcceptMessage *) m;
427 struct Set *set; 437 struct Set *set;
428 struct Incoming *incoming; 438 struct Incoming *incoming;
429 struct EvaluateOperation *eo; 439 struct AcceptMessage *msg = (struct AcceptMessage *) mh;
430 440
431 set = get_set (client); 441 set = get_set (client);
432 442
433 if (NULL == set) 443 if (NULL == set)
434 { 444 {
435 GNUNET_break (0); 445 GNUNET_break (0);
436 client_disconnect (client); 446 _GSS_client_disconnect (client);
437 return; 447 return;
438 } 448 }
439 449
@@ -443,16 +453,10 @@ handle_client_accept (void *cls,
443 (incoming->operation != set->operation) ) 453 (incoming->operation != set->operation) )
444 { 454 {
445 GNUNET_break (0); 455 GNUNET_break (0);
446 client_disconnect (client); 456 _GSS_client_disconnect (client);
447 return; 457 return;
448 } 458 }
449 459
450 eo = GNUNET_new (struct EvaluateOperation);
451 eo->peer = incoming->peer;
452 eo->app_id = incoming->app_id;
453 eo->request_id = msg->request_id;
454 eo->set = set;
455
456 switch (set->operation) 460 switch (set->operation)
457 { 461 {
458 case GNUNET_SET_OPERATION_INTERSECTION: 462 case GNUNET_SET_OPERATION_INTERSECTION:
@@ -460,12 +464,15 @@ handle_client_accept (void *cls,
460 GNUNET_assert (0); 464 GNUNET_assert (0);
461 break; 465 break;
462 case GNUNET_SET_OPERATION_UNION: 466 case GNUNET_SET_OPERATION_UNION:
463 union_accept (eo, incoming); 467 _GSS_union_accept (msg, set, incoming);
464 break; 468 break;
465 default: 469 default:
466 GNUNET_assert (0); 470 GNUNET_assert (0);
467 break; 471 break;
468 } 472 }
473 /* FIXME: destroy incoming */
474
475 GNUNET_SERVER_receive_done (client, GNUNET_OK);
469} 476}
470 477
471 478
@@ -522,7 +529,7 @@ shutdown_task (void *cls,
522 { 529 {
523 GNUNET_STREAM_listen_close (stream_listen_socket); 530 GNUNET_STREAM_listen_close (stream_listen_socket);
524 stream_listen_socket = NULL; 531 stream_listen_socket = NULL;
525 } 532 }
526 533
527 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); 534 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
528} 535}
@@ -575,4 +582,3 @@ main (int argc, char *const *argv)
575 return (GNUNET_OK == ret) ? 0 : 1; 582 return (GNUNET_OK == ret) ? 0 : 1;
576} 583}
577 584
578
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 685fc47a4..2d0d6595d 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -86,83 +86,7 @@ struct Set
86 union { 86 union {
87 struct IntersectionState *i; 87 struct IntersectionState *i;
88 struct UnionState *u; 88 struct UnionState *u;
89 } extra; 89 } state;
90};
91
92
93/**
94 * State for an evaluate operation for a set that
95 * supports set union.
96 */
97struct UnionEvaluateOperation;
98
99
100/* FIXME: cfuchs */
101struct IntersectionEvaluateOperation
102{
103 /* FIXME: cfuchs */
104};
105
106
107/**
108 * State of evaluation a set operation with
109 * another peer
110 */
111struct EvaluateOperation
112{
113 /**
114 * Local set the operation is evaluated on
115 */
116 struct Set *set;
117
118 /**
119 * Peer with the remote set
120 */
121 struct GNUNET_PeerIdentity peer;
122
123 /**
124 * Application-specific identifier
125 */
126 struct GNUNET_HashCode app_id;
127
128 /**
129 * Context message, given to us
130 * by the client, may be NULL.
131 */
132 struct GNUNET_MessageHeader *context_msg;
133
134 /**
135 * Stream socket connected to the other peer
136 */
137 struct GNUNET_STREAM_Socket *socket;
138
139 /**
140 * Message queue for the peer on the other
141 * end
142 */
143 struct GNUNET_MQ_MessageQueue *mq;
144
145 /**
146 * Type of this operation
147 */
148 enum GNUNET_SET_OperationType operation;
149
150 /**
151 * GNUNET_YES if we started the operation,
152 * GNUNET_NO if the other peer started it.
153 */
154 int is_outgoing;
155
156 /**
157 * Request id, so we can use one client handle
158 * for multiple operations
159 */
160 uint32_t request_id;
161
162 union {
163 struct UnionEvaluateOperation *u;
164 struct IntersectionEvaluateOperation *i;
165 } extra;
166}; 90};
167 91
168 92
@@ -246,6 +170,12 @@ struct Incoming
246 struct GNUNET_MessageHeader *context_msg; 170 struct GNUNET_MessageHeader *context_msg;
247 171
248 /** 172 /**
173 * Salt the peer has requested to use for the
174 * operation
175 */
176 uint16_t salt;
177
178 /**
249 * Operation the other peer wants to do 179 * Operation the other peer wants to do
250 */ 180 */
251 enum GNUNET_SET_OperationType operation; 181 enum GNUNET_SET_OperationType operation;
@@ -271,23 +201,24 @@ extern const struct GNUNET_CONFIGURATION_Handle *configuration;
271 * @param client the client to disconnect 201 * @param client the client to disconnect
272 */ 202 */
273void 203void
274client_disconnect (struct GNUNET_SERVER_Client *client); 204_GSS_client_disconnect (struct GNUNET_SERVER_Client *client);
275 205
276 206
277struct Set * 207struct Set *
278union_set_create (void); 208_GSS_union_set_create (void);
279 209
280 210
281void 211void
282union_evaluate (struct EvaluateOperation *eo); 212_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set);
283 213
284 214
285void 215void
286union_add (struct Set *set, struct ElementMessage *m); 216_GSS_union_add (struct ElementMessage *m, struct Set *set);
287 217
288 218
289void 219void
290union_accept (struct EvaluateOperation *eo, struct Incoming *incoming); 220_GSS_union_accept (struct AcceptMessage *m, struct Set *set,
221 struct Incoming *incoming);
291 222
292 223
293#endif 224#endif
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index e65452a54..4903ce605 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -26,9 +26,12 @@
26 26
27 27
28#include "gnunet-service-set.h" 28#include "gnunet-service-set.h"
29#include "set_protocol.h" 29#include "gnunet_container_lib.h"
30#include "gnunet_crypto_lib.h"
30#include "ibf.h" 31#include "ibf.h"
31#include "strata_estimator.h" 32#include "strata_estimator.h"
33#include "set_protocol.h"
34#include <gcrypt.h>
32 35
33 36
34/** 37/**
@@ -49,7 +52,6 @@
49 */ 52 */
50#define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) 53#define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
51 54
52
53/** 55/**
54 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). 56 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
55 * Choose this value so that computing the IBF is still cheaper 57 * Choose this value so that computing the IBF is still cheaper
@@ -76,6 +78,55 @@ enum UnionOperationState
76 */ 78 */
77struct UnionEvaluateOperation 79struct UnionEvaluateOperation
78{ 80{
81 /**
82 * Local set the operation is evaluated on
83 */
84 struct Set *set;
85
86 /**
87 * Peer with the remote set
88 */
89 struct GNUNET_PeerIdentity peer;
90
91 /**
92 * Application-specific identifier
93 */
94 struct GNUNET_HashCode app_id;
95
96 /**
97 * Context message, given to us
98 * by the client, may be NULL.
99 */
100 struct GNUNET_MessageHeader *context_msg;
101
102 /**
103 * Stream socket connected to the other peer
104 */
105 struct GNUNET_STREAM_Socket *socket;
106
107 /**
108 * Message queue for the peer on the other
109 * end
110 */
111 struct GNUNET_MQ_MessageQueue *mq;
112
113 /**
114 * Type of this operation
115 */
116 enum GNUNET_SET_OperationType operation;
117
118 /**
119 * GNUNET_YES if we started the operation,
120 * GNUNET_NO if the other peer started it.
121 */
122 int is_outgoing;
123
124 /**
125 * Request id, so we can use one client handle
126 * for multiple operations
127 */
128 uint32_t request_id;
129
79 /* last difference estimate */ 130 /* last difference estimate */
80 unsigned int diff; 131 unsigned int diff;
81 132
@@ -95,51 +146,109 @@ struct UnionEvaluateOperation
95 */ 146 */
96 unsigned int ibf_order; 147 unsigned int ibf_order;
97 148
149 struct StrataEstimator *se;
150
98 /** 151 /**
99 * The ibf we currently receive 152 * The ibf we currently receive
100 */ 153 */
101 struct InvertibleBloomFilter *ibf_received; 154 struct InvertibleBloomFilter *remote_ibf;
102 155
103 struct StrataEstimator *se; 156 /**
157 * Array of IBFs, some of them pre-allocated
158 */
159 struct InvertibleBloomFilter *local_ibf;
160
161 /**
162 * Elements we received from the other peer.
163 */
164 struct GNUNET_CONTAINER_MultiHashMap *received_elements;
165
166 /**
167 * Maps IBF-Keys (specific to the current salt) to elements.
168 */
169 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
104 170
105 /** 171 /**
106 * Current state of the operation 172 * Current state of the operation
107 */ 173 */
108 enum UnionOperationState state; 174 enum UnionOperationState state;
175
176 /**
177 * Evaluate operations are held in
178 * a linked list.
179 */
180 struct UnionEvaluateOperation *next;
181
182 /**
183 * Evaluate operations are held in
184 * a linked list.
185 */
186 struct UnionEvaluateOperation *prev;
109}; 187};
110 188
111
112/** 189/**
113 * Element entry, stored in the hash maps from 190 * Information about the element in a set.
114 * partial IBF keys to elements. 191 * All elements are stored in a hash-table
192 * from their hash-code to their 'struct Element',
193 * so that the remove and add operations are reasonably
194 * fast.
115 */ 195 */
116struct ElementEntry 196struct ElementEntry
117{ 197{
118 /** 198 /**
119 * The actual element 199 * The actual element. The data for the element
200 * should be allocated at the end of this struct.
120 */ 201 */
121 struct GNUNET_SET_Element *element; 202 struct GNUNET_SET_Element element;
122 203
123 /** 204 /**
124 * Actual ibf key of the element entry 205 * Hash of the element.
206 * Will be used to derive the different IBF keys
207 * for different salts.
125 */ 208 */
126 struct IBF_Key ibf_key; 209 struct GNUNET_HashCode element_hash;
127 210
128 /** 211 /**
129 * Linked list, note that the next element 212 * Generation the element was added.
130 * has to have an ibf_key that is lexicographically 213 * Operations of earlier generations will not consider the element.
131 * equal or larger.
132 */ 214 */
133 struct ElementEntry *next; 215 int generation_add;
134 216
135 /** 217 /**
136 * GNUNET_YES if the element was received from 218 * Generation this element was removed.
137 * the remote peer, and the local peer did not previously 219 * Operations of later generations will not consider the element.
138 * have it 220 */
221 int generation_remove;
222
223 /**
224 * GNUNET_YES if we received the element from a remote peer, and not
225 * from the local peer. Note that if the local client inserts an
226 * element *after* we got it from a remote peer, the element is
227 * considered local.
139 */ 228 */
140 int remote; 229 int remote;
141}; 230};
142 231
232/**
233 * Information about the element used for
234 * a specific union operation.
235 */
236struct KeyEntry
237{
238 struct IBF_Key ibf_key;
239
240 /**
241 * The actual element associated with the key
242 */
243 struct ElementEntry *element;
244
245 /**
246 * Element that collides with this element
247 * on the ibf key
248 */
249 struct KeyEntry *next_colliding;
250};
251
143 252
144/** 253/**
145 * Extra state required for efficient set union. 254 * Extra state required for efficient set union.
@@ -147,47 +256,72 @@ struct ElementEntry
147struct UnionState 256struct UnionState
148{ 257{
149 /** 258 /**
150 * Strate estimator of the set we currently have, 259 * The strata estimator is only generated once for
151 * used for estimation of the symmetric difference 260 * each set.
152 */ 261 */
153 struct StrataEstimator *se; 262 struct StrataEstimator *se;
154 263
155 /** 264 /**
156 * Array of IBFs, some of them pre-allocated 265 * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
266 */
267 struct GNUNET_CONTAINER_MultiHashMap *elements;
268
269 /**
270 * Evaluate operations are held in
271 * a linked list.
157 */ 272 */
158 struct InvertibleBloomFilter **ibfs; 273 struct UnionEvaluateOperation *ops_head;
159 274
160 /** 275 /**
161 * Maps the first 32 bits of the ibf-key to 276 * Evaluate operations are held in
162 * elements. 277 * a linked list.
163 */ 278 */
164 struct GNUNET_CONTAINER_MultiHashMap32 *elements; 279 struct UnionEvaluateOperation *ops_tail;
165}; 280};
166 281
167 282
283static struct IBF_Key
284get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
285{
286
287 struct IBF_Key key;
288 GNUNET_CRYPTO_hkdf (&key, sizeof (key),
289 GCRY_MD_SHA512, GCRY_MD_SHA256,
290 src, sizeof *src,
291 &salt, sizeof (salt),
292 NULL, 0);
293 return key;
294}
295
296
168static void 297static void
169send_operation_request (struct EvaluateOperation *eo) 298send_operation_request (struct UnionEvaluateOperation *eo)
170{ 299{
171 struct GNUNET_MQ_Message *mqm; 300 struct GNUNET_MQ_Message *mqm;
172 struct OperationRequestMessage *msg; 301 struct OperationRequestMessage *msg;
302 int ret;
173 303
174 mqm = GNUNET_MQ_msg_concat (msg, eo->context_msg, 304 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
175 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); 305 ret = GNUNET_MQ_nest (mqm, eo->context_msg);
176 if (NULL == mqm) 306 if (GNUNET_OK != ret)
177 { 307 {
178 /* the context message is too large */ 308 /* the context message is too large */
179 client_disconnect (eo->set->client); 309 _GSS_client_disconnect (eo->set->client);
310 GNUNET_MQ_discard (mqm);
180 GNUNET_break (0); 311 GNUNET_break (0);
181 return; 312 return;
182 } 313 }
183 msg->operation = eo->operation; 314 msg->operation = eo->operation;
184 msg->app_id = eo->app_id; 315 msg->app_id = eo->app_id;
185 GNUNET_MQ_send (eo->mq, mqm); 316 GNUNET_MQ_send (eo->mq, mqm);
317
318 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
186} 319}
187 320
188 321
189/** 322/**
190 * Iterator to insert values into an ibf. 323 * Iterator to create the mapping between ibf keys
324 * and element entries.
191 * 325 *
192 * @param cls closure 326 * @param cls closure
193 * @param key current key code 327 * @param key current key code
@@ -197,54 +331,93 @@ send_operation_request (struct EvaluateOperation *eo)
197 * GNUNET_NO if not. 331 * GNUNET_NO if not.
198 */ 332 */
199static int 333static int
200ibf_insert_iterator (void *cls, 334insert_element_iterator (void *cls,
201 uint32_t key, 335 uint32_t key,
202 void *value) 336 void *value)
203{ 337{
204 struct InvertibleBloomFilter *ibf = cls; 338 struct KeyEntry *const new_k = cls;
205 struct ElementEntry *e = value; 339 struct KeyEntry *old_k = value;
206 struct IBF_Key ibf_key;
207
208 GNUNET_assert (NULL != e);
209 ibf_key = e->ibf_key;
210 ibf_insert (ibf, ibf_key);
211 e = e->next;
212 340
213 while (NULL != e) 341 GNUNET_assert (NULL != old_k);
342 do
214 { 343 {
215 /* only insert keys we haven't seen yet */ 344 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
216 if (0 != memcmp (&e->ibf_key, &ibf_key, sizeof ibf_key))
217 { 345 {
218 ibf_key = e->ibf_key; 346 new_k->next_colliding = old_k;
219 ibf_insert (ibf, ibf_key); 347 old_k->next_colliding = new_k;
348 return GNUNET_NO;
220 } 349 }
221 e = e->next; 350 old_k = old_k->next_colliding;
351 } while (NULL != old_k);
352 return GNUNET_YES;
353}
354
355
356static void
357insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
358{
359 int ret;
360 struct IBF_Key ibf_key;
361 struct KeyEntry *k;
362
363 ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
364 k = GNUNET_new (struct KeyEntry);
365 k->element = ee;
366 k->ibf_key = ibf_key;
367 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
368 insert_element_iterator, k);
369 /* was the element inserted into a colliding bucket? */
370 if (GNUNET_SYSERR == ret)
371 {
372 GNUNET_assert (NULL != k->next_colliding);
373 return;
222 } 374 }
375 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
376 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
377 if (NULL != eo->local_ibf)
378 ibf_insert (eo->local_ibf, ibf_key);
379}
380
381
382static int
383prepare_ibf_iterator (void *cls,
384 uint32_t key,
385 void *value)
386{
387 struct InvertibleBloomFilter *ibf = cls;
388 struct KeyEntry *ke = value;
223 389
390 ibf_insert (ibf, ke->ibf_key);
224 return GNUNET_YES; 391 return GNUNET_YES;
225} 392}
226 393
227 394static int
228/** 395init_key_to_element_iterator (void *cls,
229 * Create and populate an IBF for the specified peer, 396 const struct GNUNET_HashCode *key,
230 * if it does not already exist. 397 void *value)
231 *
232 * @param cpi peer to create the ibf for
233 */
234static struct InvertibleBloomFilter *
235prepare_ibf (struct EvaluateOperation *eo, uint16_t order)
236{ 398{
237 struct UnionState *us = eo->set->extra.u; 399 struct UnionEvaluateOperation *eo = cls;
400 struct ElementEntry *e = value;
401
402 insert_element (eo, e);
403 return GNUNET_YES;
404}
238 405
239 GNUNET_assert (order <= MAX_IBF_ORDER); 406static void
240 if (NULL == us->ibfs) 407prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
241 us->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *)); 408{
242 if (NULL == us->ibfs[order]) 409 if (NULL == eo->key_to_element)
243 { 410 {
244 us->ibfs[order] = ibf_create (1 << order, SE_IBF_HASH_NUM); 411 unsigned int len;
245 GNUNET_CONTAINER_multihashmap32_iterate (us->elements, ibf_insert_iterator, us->ibfs[order]); 412 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
413 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len);
414 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
415 init_key_to_element_iterator, eo);
246 } 416 }
247 return us->ibfs[order]; 417 if (NULL != eo->local_ibf)
418 ibf_destroy (eo->local_ibf);
419 eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
420 GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, prepare_ibf_iterator, eo->local_ibf);
248} 421}
249 422
250 423
@@ -254,12 +427,14 @@ prepare_ibf (struct EvaluateOperation *eo, uint16_t order)
254 * @param cpi the peer 427 * @param cpi the peer
255 */ 428 */
256static void 429static void
257send_ibf (struct EvaluateOperation *eo, uint16_t ibf_order) 430send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
258{ 431{
259 unsigned int buckets_sent = 0; 432 unsigned int buckets_sent = 0;
260 struct InvertibleBloomFilter *ibf; 433 struct InvertibleBloomFilter *ibf;
261 434
262 ibf = prepare_ibf (eo, ibf_order); 435 prepare_ibf (eo, ibf_order);
436
437 ibf = eo->local_ibf;
263 438
264 while (buckets_sent < (1 << ibf_order)) 439 while (buckets_sent < (1 << ibf_order))
265 { 440 {
@@ -282,7 +457,7 @@ send_ibf (struct EvaluateOperation *eo, uint16_t ibf_order)
282 GNUNET_MQ_send (eo->mq, mqm); 457 GNUNET_MQ_send (eo->mq, mqm);
283 } 458 }
284 459
285 eo->extra.u->state = STATE_EXPECT_ELEMENTS_AND_REQUESTS; 460 eo->state = STATE_EXPECT_ELEMENTS_AND_REQUESTS;
286} 461}
287 462
288 463
@@ -292,7 +467,7 @@ send_ibf (struct EvaluateOperation *eo, uint16_t ibf_order)
292 * @param cpi the peer 467 * @param cpi the peer
293 */ 468 */
294static void 469static void
295send_strata_estimator (struct EvaluateOperation *eo) 470send_strata_estimator (struct UnionEvaluateOperation *eo)
296{ 471{
297 struct GNUNET_MQ_Message *mqm; 472 struct GNUNET_MQ_Message *mqm;
298 struct GNUNET_MessageHeader *strata_msg; 473 struct GNUNET_MessageHeader *strata_msg;
@@ -300,31 +475,36 @@ send_strata_estimator (struct EvaluateOperation *eo)
300 mqm = GNUNET_MQ_msg_header_extra (strata_msg, 475 mqm = GNUNET_MQ_msg_header_extra (strata_msg,
301 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 476 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
302 GNUNET_MESSAGE_TYPE_SET_P2P_SE); 477 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
303 strata_estimator_write (eo->set->extra.u->se, &strata_msg[1]); 478 strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
304 GNUNET_MQ_send (eo->mq, mqm); 479 GNUNET_MQ_send (eo->mq, mqm);
305 480 eo->state = STATE_EXPECT_IBF;
306 eo->extra.u->state = STATE_EXPECT_IBF;
307} 481}
308 482
309 483
310static void 484static void
311handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) 485handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
312{ 486{
313 struct EvaluateOperation *eo = cls; 487 struct UnionEvaluateOperation *eo = cls;
488 struct StrataEstimator *remote_se;
314 int ibf_order; 489 int ibf_order;
315 int diff; 490 int diff;
316 491
317 if (eo->extra.u->state != STATE_EXPECT_SE) 492 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
493
494 if (eo->state != STATE_EXPECT_SE)
318 { 495 {
319 /* FIXME: handle */ 496 /* FIXME: handle */
320 GNUNET_break (0); 497 GNUNET_break (0);
321 return; 498 return;
322 } 499 }
323 GNUNET_assert (NULL == eo->extra.u->se); 500 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
324 eo->extra.u->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); 501 SE_IBF_HASH_NUM);
325 strata_estimator_read (&mh[1], eo->extra.u->se); 502 strata_estimator_read (&mh[1], remote_se);
326 GNUNET_assert (NULL != eo->set->extra.u->se); 503 GNUNET_assert (NULL != eo->se);
327 diff = strata_estimator_difference (eo->set->extra.u->se, eo->extra.u->se); 504 diff = strata_estimator_difference (remote_se, eo->se);
505 strata_estimator_destroy (remote_se);
506 strata_estimator_destroy (eo->se);
507 eo->se = NULL;
328 /* minimum order */ 508 /* minimum order */
329 ibf_order = 2; 509 ibf_order = 2;
330 while ((1<<ibf_order) < (2 * diff)) 510 while ((1<<ibf_order) < (2 * diff))
@@ -341,16 +521,17 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
341 * @param 521 * @param
342 */ 522 */
343static void 523static void
344decode (struct EvaluateOperation *eo) 524decode (struct UnionEvaluateOperation *eo)
345{ 525{
346 struct IBF_Key key; 526 struct IBF_Key key;
347 int side; 527 int side;
348 struct InvertibleBloomFilter *diff_ibf; 528 struct InvertibleBloomFilter *diff_ibf;
349 529
350 GNUNET_assert (STATE_EXPECT_ELEMENTS == eo->extra.u->state); 530 GNUNET_assert (STATE_EXPECT_ELEMENTS == eo->state);
351 531
352 diff_ibf = ibf_dup (prepare_ibf (eo, eo->extra.u->ibf_order)); 532 prepare_ibf (eo, eo->ibf_order);
353 ibf_subtract (diff_ibf, eo->extra.u->ibf_received); 533 diff_ibf = ibf_dup (eo->local_ibf);
534 ibf_subtract (diff_ibf, eo->remote_ibf);
354 535
355 while (1) 536 while (1)
356 { 537 {
@@ -359,7 +540,8 @@ decode (struct EvaluateOperation *eo)
359 res = ibf_decode (diff_ibf, &side, &key); 540 res = ibf_decode (diff_ibf, &side, &key);
360 if (GNUNET_SYSERR == res) 541 if (GNUNET_SYSERR == res)
361 { 542 {
362 /* decoding failed, we tell the other peer by sending our ibf with a larger order */ 543 /* decoding failed, we tell the other peer by sending our ibf
544 * with a larger order */
363 GNUNET_assert (0); 545 GNUNET_assert (0);
364 return; 546 return;
365 } 547 }
@@ -373,20 +555,11 @@ decode (struct EvaluateOperation *eo)
373 } 555 }
374 if (1 == side) 556 if (1 == side)
375 { 557 {
376 struct ElementEntry *e; 558 //struct ElementEntry *e;
377 /* we have the element(s), send it to the other peer */ 559 /* we have the element(s), send it to the other peer */
378 e = GNUNET_CONTAINER_multihashmap32_get (eo->set->extra.u->elements, (uint32_t) key.key_val); 560 //GNUNET_CONTAINER_multihashmap32_get_multiple (eo->set->state.u->elements,
379 if (NULL == e) 561 // (uint32_t) key.key_val);
380 { 562 /* FIXME */
381 /* FIXME */
382 GNUNET_assert (0);
383 return;
384 }
385 while (NULL != e)
386 {
387 /* FIXME: send element */
388 e = e->next;
389 }
390 } 563 }
391 else 564 else
392 { 565 {
@@ -403,37 +576,35 @@ decode (struct EvaluateOperation *eo)
403} 576}
404 577
405 578
406
407static void 579static void
408handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) 580handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
409{ 581{
410 struct EvaluateOperation *eo = cls; 582 struct UnionEvaluateOperation *eo = cls;
411 struct UnionEvaluateOperation *ueo = eo->extra.u;
412 struct IBFMessage *msg = (struct IBFMessage *) mh; 583 struct IBFMessage *msg = (struct IBFMessage *) mh;
413 unsigned int buckets_in_message; 584 unsigned int buckets_in_message;
414 585
415 if (ueo->state == STATE_EXPECT_ELEMENTS_AND_REQUESTS) 586 if (eo->state == STATE_EXPECT_ELEMENTS_AND_REQUESTS)
416 { 587 {
417 /* check that the ibf is a new one / first part */ 588 /* check that the ibf is a new one / first part */
418 /* clear outgoing messages */ 589 /* clear outgoing messages */
419 GNUNET_assert (0); 590 GNUNET_assert (0);
420 } 591 }
421 else if (ueo->state == STATE_EXPECT_IBF) 592 else if (eo->state == STATE_EXPECT_IBF)
422 { 593 {
423 ueo->state = STATE_EXPECT_IBF_CONT; 594 eo->state = STATE_EXPECT_IBF_CONT;
424 ueo->ibf_order = msg->order; 595 eo->ibf_order = msg->order;
425 GNUNET_assert (NULL == ueo->ibf_received); 596 GNUNET_assert (NULL == eo->remote_ibf);
426 ueo->ibf_received = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 597 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
427 if (ntohs (msg->offset) != 0) 598 if (ntohs (msg->offset) != 0)
428 { 599 {
429 /* FIXME: handle */ 600 /* FIXME: handle */
430 GNUNET_assert (0); 601 GNUNET_assert (0);
431 } 602 }
432 } 603 }
433 else if (ueo->state == STATE_EXPECT_IBF_CONT) 604 else if (eo->state == STATE_EXPECT_IBF_CONT)
434 { 605 {
435 if ( (ntohs (msg->offset) != ueo->ibf_buckets_received) || 606 if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
436 (msg->order != ueo->ibf_order) ) 607 (msg->order != eo->ibf_order) )
437 { 608 {
438 /* FIXME: handle */ 609 /* FIXME: handle */
439 GNUNET_assert (0); 610 GNUNET_assert (0);
@@ -448,12 +619,12 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
448 GNUNET_assert (0); 619 GNUNET_assert (0);
449 } 620 }
450 621
451 ibf_read_slice (&msg[1], ueo->ibf_buckets_received, buckets_in_message, ueo->ibf_received); 622 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
452 ueo->ibf_buckets_received += buckets_in_message; 623 eo->ibf_buckets_received += buckets_in_message;
453 624
454 if (ueo->ibf_buckets_received == (1<<ueo->ibf_order)) 625 if (eo->ibf_buckets_received == (1<<eo->ibf_order))
455 { 626 {
456 ueo->state = STATE_EXPECT_ELEMENTS; 627 eo->state = STATE_EXPECT_ELEMENTS;
457 decode (eo); 628 decode (eo);
458 } 629 }
459} 630}
@@ -462,10 +633,10 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
462static void 633static void
463handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) 634handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
464{ 635{
465 struct EvaluateOperation *eo = cls; 636 struct UnionEvaluateOperation *eo = cls;
466 637
467 if ( (eo->extra.u->state != STATE_EXPECT_ELEMENTS) && 638 if ( (eo->state != STATE_EXPECT_ELEMENTS) &&
468 (eo->extra.u->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) ) 639 (eo->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) )
469 { 640 {
470 /* FIXME: handle */ 641 /* FIXME: handle */
471 GNUNET_break (0); 642 GNUNET_break (0);
@@ -477,10 +648,10 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
477static void 648static void
478handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) 649handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
479{ 650{
480 struct EvaluateOperation *eo = cls; 651 struct UnionEvaluateOperation *eo = cls;
481 652
482 /* look up elements and send them */ 653 /* look up elements and send them */
483 if (eo->extra.u->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) 654 if (eo->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS)
484 { 655 {
485 /* FIXME: handle */ 656 /* FIXME: handle */
486 GNUNET_break (0); 657 GNUNET_break (0);
@@ -508,141 +679,107 @@ static const struct GNUNET_MQ_Handler union_handlers[] = {
508 679
509/** 680/**
510 * Functions of this type will be called when a stream is established 681 * Functions of this type will be called when a stream is established
511 * 682 *
512 * @param cls the closure from GNUNET_STREAM_open 683 * @param cls the closure from GNUNET_STREAM_open
513 * @param socket socket to use to communicate with the other side (read/write) 684 * @param socket socket to use to communicate with the
685 * other side (read/write)
514 */ 686 */
515static void 687static void
516stream_open_cb (void *cls, 688stream_open_cb (void *cls,
517 struct GNUNET_STREAM_Socket *socket) 689 struct GNUNET_STREAM_Socket *socket)
518{ 690{
519 struct EvaluateOperation *eo = cls; 691 struct UnionEvaluateOperation *eo = cls;
520 692
521 GNUNET_assert (NULL == eo->mq); 693 GNUNET_assert (NULL == eo->mq);
522 GNUNET_assert (socket == eo->socket); 694 GNUNET_assert (socket == eo->socket);
523 695
524 eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket, union_handlers, eo); 696 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
697
698 eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket,
699 union_handlers, eo);
700 /* we started the operation, thus we have to send the operation request */
525 send_operation_request (eo); 701 send_operation_request (eo);
702 eo->state = STATE_EXPECT_SE;
526} 703}
527 704
528 705
529void 706void
530union_evaluate (struct EvaluateOperation *eo) 707_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
531{ 708{
532 GNUNET_assert (GNUNET_SET_OPERATION_UNION == eo->set->operation); 709 struct UnionEvaluateOperation *eo;
533 eo->socket =
534 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
535 stream_open_cb, GNUNET_STREAM_OPTION_END);
536}
537
538
539static void
540insert_ibf_key_unchecked (struct UnionState *us, struct IBF_Key ibf_key)
541{
542 int i;
543
544 strata_estimator_insert (us->se, ibf_key);
545 for (i = 0; i <= MAX_IBF_ORDER; i++)
546 {
547 if (NULL == us->ibfs)
548 break;
549 if (NULL == us->ibfs[i])
550 continue;
551 ibf_insert (us->ibfs[i], ibf_key);
552 }
553}
554 710
711 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
555 712
556/** 713 eo = GNUNET_new (struct UnionEvaluateOperation);
557 * Insert an element into the consensus set of the specified session. 714 eo->peer = m->peer;
558 * The element will not be copied, and freed when destroying the session. 715 eo->set = set;
559 * 716 eo->socket =
560 * @param session session for new element 717 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
561 * @param element element to insert 718 stream_open_cb, eo,
562 */ 719 GNUNET_STREAM_OPTION_END);
563static void
564insert_element (struct Set *set, struct GNUNET_SET_Element *element)
565{
566 struct UnionState *us = set->extra.u;
567 struct GNUNET_HashCode hash;
568 struct ElementEntry *e;
569 struct ElementEntry *e_old;
570
571 e = GNUNET_new (struct ElementEntry);
572 e->element = element;
573 GNUNET_CRYPTO_hash (e->element->data, e->element->size, &hash);
574 e->ibf_key = ibf_key_from_hashcode (&hash);
575
576 e_old = GNUNET_CONTAINER_multihashmap32_get (us->elements, (uint32_t) e->ibf_key.key_val);
577 if (NULL == e_old)
578 {
579 GNUNET_CONTAINER_multihashmap32_put (us->elements, (uint32_t) e->ibf_key.key_val, e,
580 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
581 return;
582 }
583
584 while (NULL != e_old)
585 {
586 int cmp = memcmp (&e->ibf_key, &e_old->ibf_key, sizeof (struct IBF_Key));
587 if (cmp < 0)
588 {
589 if (NULL == e_old->next)
590 {
591 e_old->next = e;
592 insert_ibf_key_unchecked (us, e->ibf_key);
593 return;
594 }
595 e_old = e_old->next;
596 }
597 else if (cmp == 0)
598 {
599 e->next = e_old->next;
600 e_old->next = e;
601 return;
602 }
603 else
604 {
605 e->next = e_old;
606 insert_ibf_key_unchecked (us, e->ibf_key);
607 return;
608 }
609 }
610} 720}
611 721
612 722
613void 723void
614union_accept (struct EvaluateOperation *eo, struct Incoming *incoming) 724_GSS_union_accept (struct AcceptMessage *m, struct Set *set,
725 struct Incoming *incoming)
615{ 726{
616 GNUNET_assert (NULL != incoming->mq); 727 struct UnionEvaluateOperation *eo;
728
729 eo = GNUNET_new (struct UnionEvaluateOperation);
730 eo->set = set;
731 eo->peer = incoming->peer;
732 eo->app_id = incoming->app_id;
733 eo->salt = ntohs (incoming->salt);
734 eo->request_id = m->request_id;
735 eo->set = set;
617 eo->mq = incoming->mq; 736 eo->mq = incoming->mq;
737 /* the peer's socket is now ours, we'll receive all messages */
618 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); 738 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
619 739 /* kick of the operation */
620 send_strata_estimator (eo); 740 send_strata_estimator (eo);
621} 741}
622 742
623 743
624struct Set * 744struct Set *
625union_set_create () 745_GSS_union_set_create (void)
626{ 746{
627 struct Set *set; 747 struct Set *set;
748
749 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n");
750
628 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); 751 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
629 set->extra.u = (struct UnionState *) &set[1]; 752 set->state.u = (struct UnionState *) &set[1];
630 set->operation = GNUNET_SET_OPERATION_UNION; 753 set->operation = GNUNET_SET_OPERATION_UNION;
631 set->extra.u->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); 754 set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
755 SE_IBF_SIZE, SE_IBF_HASH_NUM);
632 return set; 756 return set;
633} 757}
634 758
635 759
760
636void 761void
637union_add (struct Set *set, struct ElementMessage *m) 762_GSS_union_add (struct ElementMessage *m, struct Set *set)
638{ 763{
639 struct GNUNET_SET_Element *element; 764 struct ElementEntry *ee;
765 struct ElementEntry *ee_dup;
640 uint16_t element_size; 766 uint16_t element_size;
767
641 element_size = ntohs (m->header.size) - sizeof *m; 768 element_size = ntohs (m->header.size) - sizeof *m;
642 element = GNUNET_malloc (sizeof *element + element_size); 769 ee = GNUNET_malloc (element_size + sizeof *ee);
643 element->size = element_size; 770 ee->element.size = element_size;
644 element->data = &element[1]; 771 ee->element.data = &ee[1];
645 memcpy (element->data, &m[1], element_size); 772 memcpy (ee->element.data, &m[1], element_size);
646 insert_element (set, element); 773 GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
774 ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
775 if (NULL != ee_dup)
776 {
777 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
778 GNUNET_free (ee);
779 return;
780 }
781 GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
782 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
783 strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
647} 784}
648 785
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
index 7de687611..c49b60dfd 100644
--- a/src/set/gnunet-set.c
+++ b/src/set/gnunet-set.c
@@ -30,6 +30,8 @@
30#include "gnunet_set_service.h" 30#include "gnunet_set_service.h"
31 31
32 32
33static struct GNUNET_PeerIdentity local_id;
34
33static struct GNUNET_HashCode app_id; 35static struct GNUNET_HashCode app_id;
34static struct GNUNET_SET_Handle *set1; 36static struct GNUNET_SET_Handle *set1;
35static struct GNUNET_SET_Handle *set2; 37static struct GNUNET_SET_Handle *set2;
@@ -45,6 +47,13 @@ listen_cb (void *cls,
45 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); 47 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
46} 48}
47 49
50static void
51result_cb (void *cls, struct GNUNET_SET_Element *element,
52 enum GNUNET_SET_Status status)
53{
54 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result\n");
55}
56
48 57
49/** 58/**
50 * Main function that will be run. 59 * Main function that will be run.
@@ -60,12 +69,19 @@ run (void *cls, char *const *args,
60 const struct GNUNET_CONFIGURATION_Handle *cfg) 69 const struct GNUNET_CONFIGURATION_Handle *cfg)
61{ 70{
62 static const char* app_str = "gnunet-set"; 71 static const char* app_str = "gnunet-set";
72
63 GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); 73 GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
64 74
75 GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
76
65 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 77 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
66 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 78 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
67 listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, &app_id, 79 listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
68 listen_cb, NULL); 80 &app_id, listen_cb, NULL);
81
82 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
83 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED,
84 result_cb, NULL);
69} 85}
70 86
71 87
diff --git a/src/set/mq.c b/src/set/mq.c
index 236a692d4..92120a607 100644
--- a/src/set/mq.c
+++ b/src/set/mq.c
@@ -215,21 +215,25 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
215} 215}
216 216
217 217
218struct GNUNET_MQ_Message * 218int
219GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type) 219GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
220 const struct GNUNET_MessageHeader *m)
220{ 221{
221 struct GNUNET_MQ_Message *mq; 222 size_t new_size;
223 size_t old_size;
222 224
223 GNUNET_assert (NULL != mhp);
224 if (NULL == m) 225 if (NULL == m)
225 return GNUNET_MQ_msg_ (mhp, base_size, type); 226 return GNUNET_OK;
226 GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader))); 227 GNUNET_assert (NULL != mqmp);
227 /* check for overflow */ 228 old_size = ntohs ((*mqmp)->mh->size);
228 if (base_size + ntohs (m->size) <= base_size) 229 /* message too large to concatenate? */
229 return NULL; 230 if (ntohs ((*mqmp)->mh->size) + ntohs (m->size) < ntohs (m->size))
230 mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type); 231 return GNUNET_SYSERR;
231 memcpy (((void *) *mhp) + base_size, m, ntohs (m->size)); 232 new_size = old_size + ntohs (m->size);
232 return mq; 233 *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
234 memcpy ((*mqmp)->mh + old_size, m, new_size - old_size);
235 (*mqmp)->mh->size = htons (new_size);
236 return GNUNET_OK;
233} 237}
234 238
235 239
@@ -274,20 +278,26 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
274 return; 278 return;
275 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); 279 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
276 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), 280 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
277 GNUNET_TIME_UNIT_FOREVER_REL, stream_write_queued, cls); 281 GNUNET_TIME_UNIT_FOREVER_REL,
282 stream_write_queued, mq);
278 GNUNET_assert (NULL != mss->wh); 283 GNUNET_assert (NULL != mss->wh);
279} 284}
280 285
281 286
282static void 287static void
283stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 288stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq,
289 struct GNUNET_MQ_Message *mqm)
284{ 290{
291 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
285 if (NULL != mq->current_msg) 292 if (NULL != mq->current_msg)
286 { 293 {
287 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); 294 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
288 return; 295 return;
289 } 296 }
290 stream_write_queued (mq, GNUNET_STREAM_OK, 0); 297 mq->current_msg = mqm;
298 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
299 GNUNET_TIME_UNIT_FOREVER_REL,
300 stream_write_queued, mq);
291} 301}
292 302
293 303
@@ -304,7 +314,8 @@ stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes
304 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing 314 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
305 */ 315 */
306static int 316static int
307stream_mst_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) 317stream_mst_callback (void *cls, void *client,
318 const struct GNUNET_MessageHeader *message)
308{ 319{
309 struct GNUNET_MQ_MessageQueue *mq = cls; 320 struct GNUNET_MQ_MessageQueue *mq = cls;
310 321
@@ -334,12 +345,14 @@ stream_data_processor (void *cls,
334 struct GNUNET_MQ_MessageQueue *mq = cls; 345 struct GNUNET_MQ_MessageQueue *mq = cls;
335 struct MessageStreamState *mss; 346 struct MessageStreamState *mss;
336 int ret; 347 int ret;
348
337 mss = (struct MessageStreamState *) mq->impl_state; 349 mss = (struct MessageStreamState *) mq->impl_state;
338
339 GNUNET_assert (GNUNET_STREAM_OK == status); 350 GNUNET_assert (GNUNET_STREAM_OK == status);
340 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); 351 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
341 GNUNET_assert (GNUNET_OK == ret); 352 GNUNET_assert (GNUNET_OK == ret);
342 /* we always read all data */ 353 /* we always read all data */
354 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
355 stream_data_processor, mq);
343 return size; 356 return size;
344} 357}
345 358
@@ -369,8 +382,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
369} 382}
370 383
371 384
372/** 385/*** Transmit a queued message to the session's client.
373 * Transmit a queued message to the session's client.
374 * 386 *
375 * @param cls consensus session 387 * @param cls consensus session
376 * @param size number of bytes available in buf 388 * @param size number of bytes available in buf
@@ -474,7 +486,7 @@ connection_client_transmit_queued (void *cls, size_t size,
474 mq->current_msg = mq->msg_head; 486 mq->current_msg = mq->msg_head;
475 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); 487 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
476 state->th = 488 state->th =
477 GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size, 489 GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size),
478 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, 490 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
479 &connection_client_transmit_queued, mq); 491 &connection_client_transmit_queued, mq);
480 } 492 }
@@ -483,7 +495,8 @@ connection_client_transmit_queued (void *cls, size_t size,
483 495
484 496
485static void 497static void
486connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 498connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
499 struct GNUNET_MQ_Message *mqm)
487{ 500{
488 struct ClientConnectionState *state = mq->impl_state; 501 struct ClientConnectionState *state = mq->impl_state;
489 int msize; 502 int msize;
@@ -519,7 +532,6 @@ handle_client_message (void *cls,
519 struct GNUNET_MQ_MessageQueue *mq = cls; 532 struct GNUNET_MQ_MessageQueue *mq = cls;
520 533
521 GNUNET_assert (NULL != msg); 534 GNUNET_assert (NULL != msg);
522
523 dispatch_message (mq, msg); 535 dispatch_message (mq, msg);
524} 536}
525 537
diff --git a/src/set/mq.h b/src/set/mq.h
index 3d8be789d..371bb5846 100644
--- a/src/set/mq.h
+++ b/src/set/mq.h
@@ -58,20 +58,7 @@
58 */ 58 */
59#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) 59#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
60 60
61/** 61#define GNUNET_MQ_nest(mqm, mh) GNUNET_MQ_nest_ (&mqm, mh)
62 * Allocate a GNUNET_MQ_Message, and concatenate another message
63 * after the space needed by the message struct.
64 * // nest?
65 *
66 * @param mvar variable to store the allocated message in;
67 * must have a header field
68 * @param mc message to concatenate, can be NULL
69 * @param type type of the message
70 * @return the MQ message, NULL if mc is to large to be concatenated
71 */
72#define GNUNET_MQ_msg_concat(mvar, mc, t) GNUNET_MQ_msg_concat_(((void) mvar->header, (struct GNUNET_MessageHeader **) &(mvar)), \
73 sizeof *mvar, (struct GNUNET_MessageHeader *) mc, t)
74
75 62
76/** 63/**
77 * Allocate a GNUNET_MQ_Message, where the message only consists of a header. 64 * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
@@ -157,14 +144,9 @@ struct GNUNET_MQ_Message *
157GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); 144GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type);
158 145
159 146
160/** 147int
161 * Create a new message for MQ, by concatenating another message 148GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
162 * after a message of the specified type. 149 const struct GNUNET_MessageHeader *m);
163 *
164 * @retrn the allocated MQ message
165 */
166struct GNUNET_MQ_Message *
167GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type);
168 150
169 151
170/** 152/**
diff --git a/src/set/set_api.c b/src/set/set_api.c
index b2491afe7..daa15c081 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -288,6 +288,7 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
288 const struct GNUNET_PeerIdentity *other_peer, 288 const struct GNUNET_PeerIdentity *other_peer,
289 const struct GNUNET_HashCode *app_id, 289 const struct GNUNET_HashCode *app_id,
290 const struct GNUNET_MessageHeader *context_msg, 290 const struct GNUNET_MessageHeader *context_msg,
291 uint16_t salt,
291 struct GNUNET_TIME_Relative timeout, 292 struct GNUNET_TIME_Relative timeout,
292 enum GNUNET_SET_ResultMode result_mode, 293 enum GNUNET_SET_ResultMode result_mode,
293 GNUNET_SET_ResultIterator result_cb, 294 GNUNET_SET_ResultIterator result_cb,
@@ -302,11 +303,14 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
302 oh->result_cls = result_cls; 303 oh->result_cls = result_cls;
303 oh->set = set; 304 oh->set = set;
304 305
305 mqm = GNUNET_MQ_msg_extra (msg, htons(context_msg->size), GNUNET_MESSAGE_TYPE_SET_EVALUATE); 306 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE);
306 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); 307 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh));
307 msg->peer = *other_peer; 308 msg->peer = *other_peer;
308 msg->app_id = *app_id; 309 msg->app_id = *app_id;
309 memcpy (&msg[1], context_msg, htons (context_msg->size)); 310
311 if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg))
312 GNUNET_assert (0);
313
310 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); 314 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh);
311 GNUNET_MQ_send (set->mq, mqm); 315 GNUNET_MQ_send (set->mq, mqm);
312 316