diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-04-27 14:16:29 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-04-27 14:16:29 +0000 |
commit | 490b07064ed2f8a26ab63d2ea050f5583cccb3d0 (patch) | |
tree | e75db8657a75723760a37a765f9253a3f76e2065 /src/set | |
parent | 00196642a7a8e400a054bd0a9c3b35b24be87a78 (diff) | |
download | gnunet-490b07064ed2f8a26ab63d2ea050f5583cccb3d0.tar.gz gnunet-490b07064ed2f8a26ab63d2ea050f5583cccb3d0.zip |
work on gnunet-set, isolated bug in stream
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/Makefile.am | 13 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 74 | ||||
-rw-r--r-- | src/set/gnunet-service-set.h | 95 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 561 | ||||
-rw-r--r-- | src/set/gnunet-set.c | 20 | ||||
-rw-r--r-- | src/set/mq.c | 56 | ||||
-rw-r--r-- | src/set/mq.h | 26 | ||||
-rw-r--r-- | src/set/set_api.c | 8 |
8 files changed, 476 insertions, 377 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am index c1639823e..a4c4fa6be 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am | |||
@@ -16,7 +16,7 @@ if USE_COVERAGE | |||
16 | endif | 16 | endif |
17 | 17 | ||
18 | bin_PROGRAMS = \ | 18 | bin_PROGRAMS = \ |
19 | gnunet-set | 19 | gnunet-set gnunet-set-bug |
20 | 20 | ||
21 | libexec_PROGRAMS = \ | 21 | libexec_PROGRAMS = \ |
22 | gnunet-service-set | 22 | gnunet-service-set |
@@ -35,6 +35,17 @@ gnunet_set_LDADD = \ | |||
35 | gnunet_set_DEPENDENCIES = \ | 35 | gnunet_set_DEPENDENCIES = \ |
36 | libgnunetset.la | 36 | libgnunetset.la |
37 | 37 | ||
38 | gnunet_set_bug_SOURCES = \ | ||
39 | gnunet-set-bug.c \ | ||
40 | mq.c | ||
41 | gnunet_set_bug_LDADD = \ | ||
42 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
43 | $(top_builddir)/src/stream/libgnunetstream.la \ | ||
44 | $(GN_LIBINTL) | ||
45 | # hack for mq.c, see automake Objects ‘created with both libtool and without’ | ||
46 | # remove once GNUNET_MQ is in util/ | ||
47 | gnunet_set_bug_CFLAGS = $(AM_CFLAGS) | ||
48 | |||
38 | gnunet_service_set_SOURCES = \ | 49 | gnunet_service_set_SOURCES = \ |
39 | gnunet-service-set.c \ | 50 | gnunet-service-set.c \ |
40 | gnunet-service-set_union.c \ | 51 | gnunet-service-set_union.c \ |
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index d6258aa78..314e7719d 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -84,7 +84,7 @@ static uint32_t request_id = 1; | |||
84 | * @param client the client to disconnect | 84 | * @param client the client to disconnect |
85 | */ | 85 | */ |
86 | void | 86 | void |
87 | client_disconnect (struct GNUNET_SERVER_Client *client) | 87 | _GSS_client_disconnect (struct GNUNET_SERVER_Client *client) |
88 | { | 88 | { |
89 | /* FIXME: clean up any data structures belonging to the client */ | 89 | /* FIXME: clean up any data structures belonging to the client */ |
90 | GNUNET_SERVER_client_disconnect (client); | 90 | GNUNET_SERVER_client_disconnect (client); |
@@ -170,6 +170,7 @@ destroy_incoming (struct Incoming *incoming) | |||
170 | * @param cls the incoming socket | 170 | * @param cls the incoming socket |
171 | * @param mh the message | 171 | * @param mh the message |
172 | */ | 172 | */ |
173 | |||
173 | static void | 174 | static void |
174 | handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) | 175 | handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) |
175 | { | 176 | { |
@@ -180,6 +181,8 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
180 | struct Listener *listener; | 181 | struct Listener *listener; |
181 | const struct GNUNET_MessageHeader *context_msg; | 182 | const struct GNUNET_MessageHeader *context_msg; |
182 | 183 | ||
184 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got operation request\n"); | ||
185 | |||
183 | if (ntohs (mh->size) < sizeof *msg) | 186 | if (ntohs (mh->size) < sizeof *msg) |
184 | { | 187 | { |
185 | GNUNET_break (0); | 188 | GNUNET_break (0); |
@@ -201,18 +204,28 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
201 | return; | 204 | return; |
202 | } | 205 | } |
203 | } | 206 | } |
204 | 207 | /* find the appropriate listener */ | |
205 | for (listener = listeners_head; listener != NULL; listener = listener->next) | 208 | for (listener = listeners_head; |
209 | listener != NULL; | ||
210 | listener = listener->next) | ||
206 | { | 211 | { |
207 | if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) || | 212 | if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) || |
208 | (htons (msg->operation) != listener->operation) ) | 213 | (htons (msg->operation) != listener->operation) ) |
209 | continue; | 214 | continue; |
210 | mqm = GNUNET_MQ_msg_concat (cmsg, context_msg, GNUNET_MESSAGE_TYPE_SET_REQUEST); | 215 | mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST); |
216 | if (GNUNET_OK != | ||
217 | GNUNET_MQ_nest (mqm, context_msg)) | ||
218 | { | ||
219 | /* FIXME: disconnect the peer */ | ||
220 | GNUNET_MQ_discard (mqm); | ||
221 | GNUNET_break (0); | ||
222 | } | ||
211 | incoming->request_id = request_id++; | 223 | incoming->request_id = request_id++; |
212 | cmsg->request_id = htonl (incoming->request_id); | 224 | cmsg->request_id = htonl (incoming->request_id); |
213 | GNUNET_MQ_send (listener->client_mq, mqm); | 225 | GNUNET_MQ_send (listener->client_mq, mqm); |
214 | return; | 226 | return; |
215 | } | 227 | } |
228 | /* FIXME: send a reject message */ | ||
216 | } | 229 | } |
217 | 230 | ||
218 | 231 | ||
@@ -249,7 +262,7 @@ handle_client_create (void *cls, | |||
249 | GNUNET_assert (0); | 262 | GNUNET_assert (0); |
250 | break; | 263 | break; |
251 | case GNUNET_SET_OPERATION_UNION: | 264 | case GNUNET_SET_OPERATION_UNION: |
252 | set = union_set_create (); | 265 | set = _GSS_union_set_create (); |
253 | break; | 266 | break; |
254 | default: | 267 | default: |
255 | GNUNET_free (set); | 268 | GNUNET_free (set); |
@@ -261,7 +274,7 @@ handle_client_create (void *cls, | |||
261 | set->client = client; | 274 | set->client = client; |
262 | set->client_mq = GNUNET_MQ_queue_for_server_client (client); | 275 | set->client_mq = GNUNET_MQ_queue_for_server_client (client); |
263 | GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); | 276 | GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); |
264 | 277 | ||
265 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 278 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
266 | } | 279 | } |
267 | 280 | ||
@@ -292,6 +305,8 @@ handle_client_listen (void *cls, | |||
292 | listener->app_id = msg->app_id; | 305 | listener->app_id = msg->app_id; |
293 | listener->operation = msg->operation; | 306 | listener->operation = msg->operation; |
294 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); | 307 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); |
308 | |||
309 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
295 | } | 310 | } |
296 | 311 | ||
297 | 312 | ||
@@ -313,14 +328,13 @@ handle_client_add (void *cls, | |||
313 | if (NULL == set) | 328 | if (NULL == set) |
314 | { | 329 | { |
315 | GNUNET_break (0); | 330 | GNUNET_break (0); |
316 | client_disconnect (client); | 331 | _GSS_client_disconnect (client); |
317 | return; | 332 | return; |
318 | } | 333 | } |
319 | switch (set->operation) | 334 | switch (set->operation) |
320 | { | 335 | { |
321 | case GNUNET_SET_OPERATION_UNION: | 336 | case GNUNET_SET_OPERATION_UNION: |
322 | union_add (set, (struct ElementMessage *) m); | 337 | _GSS_union_add ((struct ElementMessage *) m, set); |
323 | break; | ||
324 | case GNUNET_SET_OPERATION_INTERSECTION: | 338 | case GNUNET_SET_OPERATION_INTERSECTION: |
325 | /* FIXME: cfuchs */ | 339 | /* FIXME: cfuchs */ |
326 | break; | 340 | break; |
@@ -328,6 +342,8 @@ handle_client_add (void *cls, | |||
328 | GNUNET_assert (0); | 342 | GNUNET_assert (0); |
329 | break; | 343 | break; |
330 | } | 344 | } |
345 | |||
346 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
331 | } | 347 | } |
332 | 348 | ||
333 | 349 | ||
@@ -344,24 +360,15 @@ handle_client_evaluate (void *cls, | |||
344 | const struct GNUNET_MessageHeader *m) | 360 | const struct GNUNET_MessageHeader *m) |
345 | { | 361 | { |
346 | struct Set *set; | 362 | struct Set *set; |
347 | struct EvaluateMessage *msg = (struct EvaluateMessage *) m; | ||
348 | struct EvaluateOperation *eo; | ||
349 | 363 | ||
350 | set = get_set (client); | 364 | set = get_set (client); |
351 | |||
352 | if (NULL == set) | 365 | if (NULL == set) |
353 | { | 366 | { |
354 | GNUNET_break (0); | 367 | GNUNET_break (0); |
355 | client_disconnect (client); | 368 | _GSS_client_disconnect (client); |
356 | return; | 369 | return; |
357 | } | 370 | } |
358 | 371 | ||
359 | eo = GNUNET_new (struct EvaluateOperation); | ||
360 | eo->peer = msg->peer; | ||
361 | eo->app_id = msg->app_id; | ||
362 | eo->request_id = msg->request_id; | ||
363 | eo->context_msg = GNUNET_copy_message (&msg[1].header); | ||
364 | eo->set = set; | ||
365 | 372 | ||
366 | switch (set->operation) | 373 | switch (set->operation) |
367 | { | 374 | { |
@@ -369,12 +376,14 @@ handle_client_evaluate (void *cls, | |||
369 | /* FIXME: cfuchs */ | 376 | /* FIXME: cfuchs */ |
370 | break; | 377 | break; |
371 | case GNUNET_SET_OPERATION_UNION: | 378 | case GNUNET_SET_OPERATION_UNION: |
372 | union_evaluate (eo); | 379 | _GSS_union_evaluate ((struct EvaluateMessage *) m, set); |
373 | break; | 380 | break; |
374 | default: | 381 | default: |
375 | GNUNET_assert (0); | 382 | GNUNET_assert (0); |
376 | break; | 383 | break; |
377 | } | 384 | } |
385 | |||
386 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
378 | } | 387 | } |
379 | 388 | ||
380 | 389 | ||
@@ -391,6 +400,7 @@ handle_client_cancel (void *cls, | |||
391 | const struct GNUNET_MessageHeader *m) | 400 | const struct GNUNET_MessageHeader *m) |
392 | { | 401 | { |
393 | /* FIXME: implement */ | 402 | /* FIXME: implement */ |
403 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
394 | } | 404 | } |
395 | 405 | ||
396 | 406 | ||
@@ -407,6 +417,7 @@ handle_client_ack (void *cls, | |||
407 | const struct GNUNET_MessageHeader *m) | 417 | const struct GNUNET_MessageHeader *m) |
408 | { | 418 | { |
409 | /* FIXME: implement */ | 419 | /* FIXME: implement */ |
420 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
410 | } | 421 | } |
411 | 422 | ||
412 | 423 | ||
@@ -421,19 +432,18 @@ handle_client_ack (void *cls, | |||
421 | static void | 432 | static void |
422 | handle_client_accept (void *cls, | 433 | handle_client_accept (void *cls, |
423 | struct GNUNET_SERVER_Client *client, | 434 | struct GNUNET_SERVER_Client *client, |
424 | const struct GNUNET_MessageHeader *m) | 435 | const struct GNUNET_MessageHeader *mh) |
425 | { | 436 | { |
426 | struct AcceptMessage *msg = (struct AcceptMessage *) m; | ||
427 | struct Set *set; | 437 | struct Set *set; |
428 | struct Incoming *incoming; | 438 | struct Incoming *incoming; |
429 | struct EvaluateOperation *eo; | 439 | struct AcceptMessage *msg = (struct AcceptMessage *) mh; |
430 | 440 | ||
431 | set = get_set (client); | 441 | set = get_set (client); |
432 | 442 | ||
433 | if (NULL == set) | 443 | if (NULL == set) |
434 | { | 444 | { |
435 | GNUNET_break (0); | 445 | GNUNET_break (0); |
436 | client_disconnect (client); | 446 | _GSS_client_disconnect (client); |
437 | return; | 447 | return; |
438 | } | 448 | } |
439 | 449 | ||
@@ -443,16 +453,10 @@ handle_client_accept (void *cls, | |||
443 | (incoming->operation != set->operation) ) | 453 | (incoming->operation != set->operation) ) |
444 | { | 454 | { |
445 | GNUNET_break (0); | 455 | GNUNET_break (0); |
446 | client_disconnect (client); | 456 | _GSS_client_disconnect (client); |
447 | return; | 457 | return; |
448 | } | 458 | } |
449 | 459 | ||
450 | eo = GNUNET_new (struct EvaluateOperation); | ||
451 | eo->peer = incoming->peer; | ||
452 | eo->app_id = incoming->app_id; | ||
453 | eo->request_id = msg->request_id; | ||
454 | eo->set = set; | ||
455 | |||
456 | switch (set->operation) | 460 | switch (set->operation) |
457 | { | 461 | { |
458 | case GNUNET_SET_OPERATION_INTERSECTION: | 462 | case GNUNET_SET_OPERATION_INTERSECTION: |
@@ -460,12 +464,15 @@ handle_client_accept (void *cls, | |||
460 | GNUNET_assert (0); | 464 | GNUNET_assert (0); |
461 | break; | 465 | break; |
462 | case GNUNET_SET_OPERATION_UNION: | 466 | case GNUNET_SET_OPERATION_UNION: |
463 | union_accept (eo, incoming); | 467 | _GSS_union_accept (msg, set, incoming); |
464 | break; | 468 | break; |
465 | default: | 469 | default: |
466 | GNUNET_assert (0); | 470 | GNUNET_assert (0); |
467 | break; | 471 | break; |
468 | } | 472 | } |
473 | /* FIXME: destroy incoming */ | ||
474 | |||
475 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
469 | } | 476 | } |
470 | 477 | ||
471 | 478 | ||
@@ -522,7 +529,7 @@ shutdown_task (void *cls, | |||
522 | { | 529 | { |
523 | GNUNET_STREAM_listen_close (stream_listen_socket); | 530 | GNUNET_STREAM_listen_close (stream_listen_socket); |
524 | stream_listen_socket = NULL; | 531 | stream_listen_socket = NULL; |
525 | } | 532 | } |
526 | 533 | ||
527 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); | 534 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); |
528 | } | 535 | } |
@@ -575,4 +582,3 @@ main (int argc, char *const *argv) | |||
575 | return (GNUNET_OK == ret) ? 0 : 1; | 582 | return (GNUNET_OK == ret) ? 0 : 1; |
576 | } | 583 | } |
577 | 584 | ||
578 | |||
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 685fc47a4..2d0d6595d 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h | |||
@@ -86,83 +86,7 @@ struct Set | |||
86 | union { | 86 | union { |
87 | struct IntersectionState *i; | 87 | struct IntersectionState *i; |
88 | struct UnionState *u; | 88 | struct UnionState *u; |
89 | } extra; | 89 | } state; |
90 | }; | ||
91 | |||
92 | |||
93 | /** | ||
94 | * State for an evaluate operation for a set that | ||
95 | * supports set union. | ||
96 | */ | ||
97 | struct UnionEvaluateOperation; | ||
98 | |||
99 | |||
100 | /* FIXME: cfuchs */ | ||
101 | struct IntersectionEvaluateOperation | ||
102 | { | ||
103 | /* FIXME: cfuchs */ | ||
104 | }; | ||
105 | |||
106 | |||
107 | /** | ||
108 | * State of evaluation a set operation with | ||
109 | * another peer | ||
110 | */ | ||
111 | struct EvaluateOperation | ||
112 | { | ||
113 | /** | ||
114 | * Local set the operation is evaluated on | ||
115 | */ | ||
116 | struct Set *set; | ||
117 | |||
118 | /** | ||
119 | * Peer with the remote set | ||
120 | */ | ||
121 | struct GNUNET_PeerIdentity peer; | ||
122 | |||
123 | /** | ||
124 | * Application-specific identifier | ||
125 | */ | ||
126 | struct GNUNET_HashCode app_id; | ||
127 | |||
128 | /** | ||
129 | * Context message, given to us | ||
130 | * by the client, may be NULL. | ||
131 | */ | ||
132 | struct GNUNET_MessageHeader *context_msg; | ||
133 | |||
134 | /** | ||
135 | * Stream socket connected to the other peer | ||
136 | */ | ||
137 | struct GNUNET_STREAM_Socket *socket; | ||
138 | |||
139 | /** | ||
140 | * Message queue for the peer on the other | ||
141 | * end | ||
142 | */ | ||
143 | struct GNUNET_MQ_MessageQueue *mq; | ||
144 | |||
145 | /** | ||
146 | * Type of this operation | ||
147 | */ | ||
148 | enum GNUNET_SET_OperationType operation; | ||
149 | |||
150 | /** | ||
151 | * GNUNET_YES if we started the operation, | ||
152 | * GNUNET_NO if the other peer started it. | ||
153 | */ | ||
154 | int is_outgoing; | ||
155 | |||
156 | /** | ||
157 | * Request id, so we can use one client handle | ||
158 | * for multiple operations | ||
159 | */ | ||
160 | uint32_t request_id; | ||
161 | |||
162 | union { | ||
163 | struct UnionEvaluateOperation *u; | ||
164 | struct IntersectionEvaluateOperation *i; | ||
165 | } extra; | ||
166 | }; | 90 | }; |
167 | 91 | ||
168 | 92 | ||
@@ -246,6 +170,12 @@ struct Incoming | |||
246 | struct GNUNET_MessageHeader *context_msg; | 170 | struct GNUNET_MessageHeader *context_msg; |
247 | 171 | ||
248 | /** | 172 | /** |
173 | * Salt the peer has requested to use for the | ||
174 | * operation | ||
175 | */ | ||
176 | uint16_t salt; | ||
177 | |||
178 | /** | ||
249 | * Operation the other peer wants to do | 179 | * Operation the other peer wants to do |
250 | */ | 180 | */ |
251 | enum GNUNET_SET_OperationType operation; | 181 | enum GNUNET_SET_OperationType operation; |
@@ -271,23 +201,24 @@ extern const struct GNUNET_CONFIGURATION_Handle *configuration; | |||
271 | * @param client the client to disconnect | 201 | * @param client the client to disconnect |
272 | */ | 202 | */ |
273 | void | 203 | void |
274 | client_disconnect (struct GNUNET_SERVER_Client *client); | 204 | _GSS_client_disconnect (struct GNUNET_SERVER_Client *client); |
275 | 205 | ||
276 | 206 | ||
277 | struct Set * | 207 | struct Set * |
278 | union_set_create (void); | 208 | _GSS_union_set_create (void); |
279 | 209 | ||
280 | 210 | ||
281 | void | 211 | void |
282 | union_evaluate (struct EvaluateOperation *eo); | 212 | _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set); |
283 | 213 | ||
284 | 214 | ||
285 | void | 215 | void |
286 | union_add (struct Set *set, struct ElementMessage *m); | 216 | _GSS_union_add (struct ElementMessage *m, struct Set *set); |
287 | 217 | ||
288 | 218 | ||
289 | void | 219 | void |
290 | union_accept (struct EvaluateOperation *eo, struct Incoming *incoming); | 220 | _GSS_union_accept (struct AcceptMessage *m, struct Set *set, |
221 | struct Incoming *incoming); | ||
291 | 222 | ||
292 | 223 | ||
293 | #endif | 224 | #endif |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index e65452a54..4903ce605 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -26,9 +26,12 @@ | |||
26 | 26 | ||
27 | 27 | ||
28 | #include "gnunet-service-set.h" | 28 | #include "gnunet-service-set.h" |
29 | #include "set_protocol.h" | 29 | #include "gnunet_container_lib.h" |
30 | #include "gnunet_crypto_lib.h" | ||
30 | #include "ibf.h" | 31 | #include "ibf.h" |
31 | #include "strata_estimator.h" | 32 | #include "strata_estimator.h" |
33 | #include "set_protocol.h" | ||
34 | #include <gcrypt.h> | ||
32 | 35 | ||
33 | 36 | ||
34 | /** | 37 | /** |
@@ -49,7 +52,6 @@ | |||
49 | */ | 52 | */ |
50 | #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) | 53 | #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) |
51 | 54 | ||
52 | |||
53 | /** | 55 | /** |
54 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | 56 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). |
55 | * Choose this value so that computing the IBF is still cheaper | 57 | * Choose this value so that computing the IBF is still cheaper |
@@ -76,6 +78,55 @@ enum UnionOperationState | |||
76 | */ | 78 | */ |
77 | struct UnionEvaluateOperation | 79 | struct UnionEvaluateOperation |
78 | { | 80 | { |
81 | /** | ||
82 | * Local set the operation is evaluated on | ||
83 | */ | ||
84 | struct Set *set; | ||
85 | |||
86 | /** | ||
87 | * Peer with the remote set | ||
88 | */ | ||
89 | struct GNUNET_PeerIdentity peer; | ||
90 | |||
91 | /** | ||
92 | * Application-specific identifier | ||
93 | */ | ||
94 | struct GNUNET_HashCode app_id; | ||
95 | |||
96 | /** | ||
97 | * Context message, given to us | ||
98 | * by the client, may be NULL. | ||
99 | */ | ||
100 | struct GNUNET_MessageHeader *context_msg; | ||
101 | |||
102 | /** | ||
103 | * Stream socket connected to the other peer | ||
104 | */ | ||
105 | struct GNUNET_STREAM_Socket *socket; | ||
106 | |||
107 | /** | ||
108 | * Message queue for the peer on the other | ||
109 | * end | ||
110 | */ | ||
111 | struct GNUNET_MQ_MessageQueue *mq; | ||
112 | |||
113 | /** | ||
114 | * Type of this operation | ||
115 | */ | ||
116 | enum GNUNET_SET_OperationType operation; | ||
117 | |||
118 | /** | ||
119 | * GNUNET_YES if we started the operation, | ||
120 | * GNUNET_NO if the other peer started it. | ||
121 | */ | ||
122 | int is_outgoing; | ||
123 | |||
124 | /** | ||
125 | * Request id, so we can use one client handle | ||
126 | * for multiple operations | ||
127 | */ | ||
128 | uint32_t request_id; | ||
129 | |||
79 | /* last difference estimate */ | 130 | /* last difference estimate */ |
80 | unsigned int diff; | 131 | unsigned int diff; |
81 | 132 | ||
@@ -95,51 +146,109 @@ struct UnionEvaluateOperation | |||
95 | */ | 146 | */ |
96 | unsigned int ibf_order; | 147 | unsigned int ibf_order; |
97 | 148 | ||
149 | struct StrataEstimator *se; | ||
150 | |||
98 | /** | 151 | /** |
99 | * The ibf we currently receive | 152 | * The ibf we currently receive |
100 | */ | 153 | */ |
101 | struct InvertibleBloomFilter *ibf_received; | 154 | struct InvertibleBloomFilter *remote_ibf; |
102 | 155 | ||
103 | struct StrataEstimator *se; | 156 | /** |
157 | * Array of IBFs, some of them pre-allocated | ||
158 | */ | ||
159 | struct InvertibleBloomFilter *local_ibf; | ||
160 | |||
161 | /** | ||
162 | * Elements we received from the other peer. | ||
163 | */ | ||
164 | struct GNUNET_CONTAINER_MultiHashMap *received_elements; | ||
165 | |||
166 | /** | ||
167 | * Maps IBF-Keys (specific to the current salt) to elements. | ||
168 | */ | ||
169 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; | ||
104 | 170 | ||
105 | /** | 171 | /** |
106 | * Current state of the operation | 172 | * Current state of the operation |
107 | */ | 173 | */ |
108 | enum UnionOperationState state; | 174 | enum UnionOperationState state; |
175 | |||
176 | /** | ||
177 | * Evaluate operations are held in | ||
178 | * a linked list. | ||
179 | */ | ||
180 | struct UnionEvaluateOperation *next; | ||
181 | |||
182 | /** | ||
183 | * Evaluate operations are held in | ||
184 | * a linked list. | ||
185 | */ | ||
186 | struct UnionEvaluateOperation *prev; | ||
109 | }; | 187 | }; |
110 | 188 | ||
111 | |||
112 | /** | 189 | /** |
113 | * Element entry, stored in the hash maps from | 190 | * Information about the element in a set. |
114 | * partial IBF keys to elements. | 191 | * All elements are stored in a hash-table |
192 | * from their hash-code to their 'struct Element', | ||
193 | * so that the remove and add operations are reasonably | ||
194 | * fast. | ||
115 | */ | 195 | */ |
116 | struct ElementEntry | 196 | struct ElementEntry |
117 | { | 197 | { |
118 | /** | 198 | /** |
119 | * The actual element | 199 | * The actual element. The data for the element |
200 | * should be allocated at the end of this struct. | ||
120 | */ | 201 | */ |
121 | struct GNUNET_SET_Element *element; | 202 | struct GNUNET_SET_Element element; |
122 | 203 | ||
123 | /** | 204 | /** |
124 | * Actual ibf key of the element entry | 205 | * Hash of the element. |
206 | * Will be used to derive the different IBF keys | ||
207 | * for different salts. | ||
125 | */ | 208 | */ |
126 | struct IBF_Key ibf_key; | 209 | struct GNUNET_HashCode element_hash; |
127 | 210 | ||
128 | /** | 211 | /** |
129 | * Linked list, note that the next element | 212 | * Generation the element was added. |
130 | * has to have an ibf_key that is lexicographically | 213 | * Operations of earlier generations will not consider the element. |
131 | * equal or larger. | ||
132 | */ | 214 | */ |
133 | struct ElementEntry *next; | 215 | int generation_add; |
134 | 216 | ||
135 | /** | 217 | /** |
136 | * GNUNET_YES if the element was received from | 218 | * Generation this element was removed. |
137 | * the remote peer, and the local peer did not previously | 219 | * Operations of later generations will not consider the element. |
138 | * have it | 220 | */ |
221 | int generation_remove; | ||
222 | |||
223 | /** | ||
224 | * GNUNET_YES if we received the element from a remote peer, and not | ||
225 | * from the local peer. Note that if the local client inserts an | ||
226 | * element *after* we got it from a remote peer, the element is | ||
227 | * considered local. | ||
139 | */ | 228 | */ |
140 | int remote; | 229 | int remote; |
141 | }; | 230 | }; |
142 | 231 | ||
232 | /** | ||
233 | * Information about the element used for | ||
234 | * a specific union operation. | ||
235 | */ | ||
236 | struct KeyEntry | ||
237 | { | ||
238 | struct IBF_Key ibf_key; | ||
239 | |||
240 | /** | ||
241 | * The actual element associated with the key | ||
242 | */ | ||
243 | struct ElementEntry *element; | ||
244 | |||
245 | /** | ||
246 | * Element that collides with this element | ||
247 | * on the ibf key | ||
248 | */ | ||
249 | struct KeyEntry *next_colliding; | ||
250 | }; | ||
251 | |||
143 | 252 | ||
144 | /** | 253 | /** |
145 | * Extra state required for efficient set union. | 254 | * Extra state required for efficient set union. |
@@ -147,47 +256,72 @@ struct ElementEntry | |||
147 | struct UnionState | 256 | struct UnionState |
148 | { | 257 | { |
149 | /** | 258 | /** |
150 | * Strate estimator of the set we currently have, | 259 | * The strata estimator is only generated once for |
151 | * used for estimation of the symmetric difference | 260 | * each set. |
152 | */ | 261 | */ |
153 | struct StrataEstimator *se; | 262 | struct StrataEstimator *se; |
154 | 263 | ||
155 | /** | 264 | /** |
156 | * Array of IBFs, some of them pre-allocated | 265 | * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'. |
266 | */ | ||
267 | struct GNUNET_CONTAINER_MultiHashMap *elements; | ||
268 | |||
269 | /** | ||
270 | * Evaluate operations are held in | ||
271 | * a linked list. | ||
157 | */ | 272 | */ |
158 | struct InvertibleBloomFilter **ibfs; | 273 | struct UnionEvaluateOperation *ops_head; |
159 | 274 | ||
160 | /** | 275 | /** |
161 | * Maps the first 32 bits of the ibf-key to | 276 | * Evaluate operations are held in |
162 | * elements. | 277 | * a linked list. |
163 | */ | 278 | */ |
164 | struct GNUNET_CONTAINER_MultiHashMap32 *elements; | 279 | struct UnionEvaluateOperation *ops_tail; |
165 | }; | 280 | }; |
166 | 281 | ||
167 | 282 | ||
283 | static struct IBF_Key | ||
284 | get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | ||
285 | { | ||
286 | |||
287 | struct IBF_Key key; | ||
288 | GNUNET_CRYPTO_hkdf (&key, sizeof (key), | ||
289 | GCRY_MD_SHA512, GCRY_MD_SHA256, | ||
290 | src, sizeof *src, | ||
291 | &salt, sizeof (salt), | ||
292 | NULL, 0); | ||
293 | return key; | ||
294 | } | ||
295 | |||
296 | |||
168 | static void | 297 | static void |
169 | send_operation_request (struct EvaluateOperation *eo) | 298 | send_operation_request (struct UnionEvaluateOperation *eo) |
170 | { | 299 | { |
171 | struct GNUNET_MQ_Message *mqm; | 300 | struct GNUNET_MQ_Message *mqm; |
172 | struct OperationRequestMessage *msg; | 301 | struct OperationRequestMessage *msg; |
302 | int ret; | ||
173 | 303 | ||
174 | mqm = GNUNET_MQ_msg_concat (msg, eo->context_msg, | 304 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); |
175 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); | 305 | ret = GNUNET_MQ_nest (mqm, eo->context_msg); |
176 | if (NULL == mqm) | 306 | if (GNUNET_OK != ret) |
177 | { | 307 | { |
178 | /* the context message is too large */ | 308 | /* the context message is too large */ |
179 | client_disconnect (eo->set->client); | 309 | _GSS_client_disconnect (eo->set->client); |
310 | GNUNET_MQ_discard (mqm); | ||
180 | GNUNET_break (0); | 311 | GNUNET_break (0); |
181 | return; | 312 | return; |
182 | } | 313 | } |
183 | msg->operation = eo->operation; | 314 | msg->operation = eo->operation; |
184 | msg->app_id = eo->app_id; | 315 | msg->app_id = eo->app_id; |
185 | GNUNET_MQ_send (eo->mq, mqm); | 316 | GNUNET_MQ_send (eo->mq, mqm); |
317 | |||
318 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); | ||
186 | } | 319 | } |
187 | 320 | ||
188 | 321 | ||
189 | /** | 322 | /** |
190 | * Iterator to insert values into an ibf. | 323 | * Iterator to create the mapping between ibf keys |
324 | * and element entries. | ||
191 | * | 325 | * |
192 | * @param cls closure | 326 | * @param cls closure |
193 | * @param key current key code | 327 | * @param key current key code |
@@ -197,54 +331,93 @@ send_operation_request (struct EvaluateOperation *eo) | |||
197 | * GNUNET_NO if not. | 331 | * GNUNET_NO if not. |
198 | */ | 332 | */ |
199 | static int | 333 | static int |
200 | ibf_insert_iterator (void *cls, | 334 | insert_element_iterator (void *cls, |
201 | uint32_t key, | 335 | uint32_t key, |
202 | void *value) | 336 | void *value) |
203 | { | 337 | { |
204 | struct InvertibleBloomFilter *ibf = cls; | 338 | struct KeyEntry *const new_k = cls; |
205 | struct ElementEntry *e = value; | 339 | struct KeyEntry *old_k = value; |
206 | struct IBF_Key ibf_key; | ||
207 | |||
208 | GNUNET_assert (NULL != e); | ||
209 | ibf_key = e->ibf_key; | ||
210 | ibf_insert (ibf, ibf_key); | ||
211 | e = e->next; | ||
212 | 340 | ||
213 | while (NULL != e) | 341 | GNUNET_assert (NULL != old_k); |
342 | do | ||
214 | { | 343 | { |
215 | /* only insert keys we haven't seen yet */ | 344 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) |
216 | if (0 != memcmp (&e->ibf_key, &ibf_key, sizeof ibf_key)) | ||
217 | { | 345 | { |
218 | ibf_key = e->ibf_key; | 346 | new_k->next_colliding = old_k; |
219 | ibf_insert (ibf, ibf_key); | 347 | old_k->next_colliding = new_k; |
348 | return GNUNET_NO; | ||
220 | } | 349 | } |
221 | e = e->next; | 350 | old_k = old_k->next_colliding; |
351 | } while (NULL != old_k); | ||
352 | return GNUNET_YES; | ||
353 | } | ||
354 | |||
355 | |||
356 | static void | ||
357 | insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) | ||
358 | { | ||
359 | int ret; | ||
360 | struct IBF_Key ibf_key; | ||
361 | struct KeyEntry *k; | ||
362 | |||
363 | ibf_key = get_ibf_key (&ee->element_hash, eo->salt); | ||
364 | k = GNUNET_new (struct KeyEntry); | ||
365 | k->element = ee; | ||
366 | k->ibf_key = ibf_key; | ||
367 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, | ||
368 | insert_element_iterator, k); | ||
369 | /* was the element inserted into a colliding bucket? */ | ||
370 | if (GNUNET_SYSERR == ret) | ||
371 | { | ||
372 | GNUNET_assert (NULL != k->next_colliding); | ||
373 | return; | ||
222 | } | 374 | } |
375 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, | ||
376 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
377 | if (NULL != eo->local_ibf) | ||
378 | ibf_insert (eo->local_ibf, ibf_key); | ||
379 | } | ||
380 | |||
381 | |||
382 | static int | ||
383 | prepare_ibf_iterator (void *cls, | ||
384 | uint32_t key, | ||
385 | void *value) | ||
386 | { | ||
387 | struct InvertibleBloomFilter *ibf = cls; | ||
388 | struct KeyEntry *ke = value; | ||
223 | 389 | ||
390 | ibf_insert (ibf, ke->ibf_key); | ||
224 | return GNUNET_YES; | 391 | return GNUNET_YES; |
225 | } | 392 | } |
226 | 393 | ||
227 | 394 | static int | |
228 | /** | 395 | init_key_to_element_iterator (void *cls, |
229 | * Create and populate an IBF for the specified peer, | 396 | const struct GNUNET_HashCode *key, |
230 | * if it does not already exist. | 397 | void *value) |
231 | * | ||
232 | * @param cpi peer to create the ibf for | ||
233 | */ | ||
234 | static struct InvertibleBloomFilter * | ||
235 | prepare_ibf (struct EvaluateOperation *eo, uint16_t order) | ||
236 | { | 398 | { |
237 | struct UnionState *us = eo->set->extra.u; | 399 | struct UnionEvaluateOperation *eo = cls; |
400 | struct ElementEntry *e = value; | ||
401 | |||
402 | insert_element (eo, e); | ||
403 | return GNUNET_YES; | ||
404 | } | ||
238 | 405 | ||
239 | GNUNET_assert (order <= MAX_IBF_ORDER); | 406 | static void |
240 | if (NULL == us->ibfs) | 407 | prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) |
241 | us->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *)); | 408 | { |
242 | if (NULL == us->ibfs[order]) | 409 | if (NULL == eo->key_to_element) |
243 | { | 410 | { |
244 | us->ibfs[order] = ibf_create (1 << order, SE_IBF_HASH_NUM); | 411 | unsigned int len; |
245 | GNUNET_CONTAINER_multihashmap32_iterate (us->elements, ibf_insert_iterator, us->ibfs[order]); | 412 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); |
413 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len); | ||
414 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, | ||
415 | init_key_to_element_iterator, eo); | ||
246 | } | 416 | } |
247 | return us->ibfs[order]; | 417 | if (NULL != eo->local_ibf) |
418 | ibf_destroy (eo->local_ibf); | ||
419 | eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | ||
420 | GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, prepare_ibf_iterator, eo->local_ibf); | ||
248 | } | 421 | } |
249 | 422 | ||
250 | 423 | ||
@@ -254,12 +427,14 @@ prepare_ibf (struct EvaluateOperation *eo, uint16_t order) | |||
254 | * @param cpi the peer | 427 | * @param cpi the peer |
255 | */ | 428 | */ |
256 | static void | 429 | static void |
257 | send_ibf (struct EvaluateOperation *eo, uint16_t ibf_order) | 430 | send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) |
258 | { | 431 | { |
259 | unsigned int buckets_sent = 0; | 432 | unsigned int buckets_sent = 0; |
260 | struct InvertibleBloomFilter *ibf; | 433 | struct InvertibleBloomFilter *ibf; |
261 | 434 | ||
262 | ibf = prepare_ibf (eo, ibf_order); | 435 | prepare_ibf (eo, ibf_order); |
436 | |||
437 | ibf = eo->local_ibf; | ||
263 | 438 | ||
264 | while (buckets_sent < (1 << ibf_order)) | 439 | while (buckets_sent < (1 << ibf_order)) |
265 | { | 440 | { |
@@ -282,7 +457,7 @@ send_ibf (struct EvaluateOperation *eo, uint16_t ibf_order) | |||
282 | GNUNET_MQ_send (eo->mq, mqm); | 457 | GNUNET_MQ_send (eo->mq, mqm); |
283 | } | 458 | } |
284 | 459 | ||
285 | eo->extra.u->state = STATE_EXPECT_ELEMENTS_AND_REQUESTS; | 460 | eo->state = STATE_EXPECT_ELEMENTS_AND_REQUESTS; |
286 | } | 461 | } |
287 | 462 | ||
288 | 463 | ||
@@ -292,7 +467,7 @@ send_ibf (struct EvaluateOperation *eo, uint16_t ibf_order) | |||
292 | * @param cpi the peer | 467 | * @param cpi the peer |
293 | */ | 468 | */ |
294 | static void | 469 | static void |
295 | send_strata_estimator (struct EvaluateOperation *eo) | 470 | send_strata_estimator (struct UnionEvaluateOperation *eo) |
296 | { | 471 | { |
297 | struct GNUNET_MQ_Message *mqm; | 472 | struct GNUNET_MQ_Message *mqm; |
298 | struct GNUNET_MessageHeader *strata_msg; | 473 | struct GNUNET_MessageHeader *strata_msg; |
@@ -300,31 +475,36 @@ send_strata_estimator (struct EvaluateOperation *eo) | |||
300 | mqm = GNUNET_MQ_msg_header_extra (strata_msg, | 475 | mqm = GNUNET_MQ_msg_header_extra (strata_msg, |
301 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | 476 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, |
302 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | 477 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); |
303 | strata_estimator_write (eo->set->extra.u->se, &strata_msg[1]); | 478 | strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); |
304 | GNUNET_MQ_send (eo->mq, mqm); | 479 | GNUNET_MQ_send (eo->mq, mqm); |
305 | 480 | eo->state = STATE_EXPECT_IBF; | |
306 | eo->extra.u->state = STATE_EXPECT_IBF; | ||
307 | } | 481 | } |
308 | 482 | ||
309 | 483 | ||
310 | static void | 484 | static void |
311 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | 485 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) |
312 | { | 486 | { |
313 | struct EvaluateOperation *eo = cls; | 487 | struct UnionEvaluateOperation *eo = cls; |
488 | struct StrataEstimator *remote_se; | ||
314 | int ibf_order; | 489 | int ibf_order; |
315 | int diff; | 490 | int diff; |
316 | 491 | ||
317 | if (eo->extra.u->state != STATE_EXPECT_SE) | 492 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n"); |
493 | |||
494 | if (eo->state != STATE_EXPECT_SE) | ||
318 | { | 495 | { |
319 | /* FIXME: handle */ | 496 | /* FIXME: handle */ |
320 | GNUNET_break (0); | 497 | GNUNET_break (0); |
321 | return; | 498 | return; |
322 | } | 499 | } |
323 | GNUNET_assert (NULL == eo->extra.u->se); | 500 | remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, |
324 | eo->extra.u->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); | 501 | SE_IBF_HASH_NUM); |
325 | strata_estimator_read (&mh[1], eo->extra.u->se); | 502 | strata_estimator_read (&mh[1], remote_se); |
326 | GNUNET_assert (NULL != eo->set->extra.u->se); | 503 | GNUNET_assert (NULL != eo->se); |
327 | diff = strata_estimator_difference (eo->set->extra.u->se, eo->extra.u->se); | 504 | diff = strata_estimator_difference (remote_se, eo->se); |
505 | strata_estimator_destroy (remote_se); | ||
506 | strata_estimator_destroy (eo->se); | ||
507 | eo->se = NULL; | ||
328 | /* minimum order */ | 508 | /* minimum order */ |
329 | ibf_order = 2; | 509 | ibf_order = 2; |
330 | while ((1<<ibf_order) < (2 * diff)) | 510 | while ((1<<ibf_order) < (2 * diff)) |
@@ -341,16 +521,17 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | |||
341 | * @param | 521 | * @param |
342 | */ | 522 | */ |
343 | static void | 523 | static void |
344 | decode (struct EvaluateOperation *eo) | 524 | decode (struct UnionEvaluateOperation *eo) |
345 | { | 525 | { |
346 | struct IBF_Key key; | 526 | struct IBF_Key key; |
347 | int side; | 527 | int side; |
348 | struct InvertibleBloomFilter *diff_ibf; | 528 | struct InvertibleBloomFilter *diff_ibf; |
349 | 529 | ||
350 | GNUNET_assert (STATE_EXPECT_ELEMENTS == eo->extra.u->state); | 530 | GNUNET_assert (STATE_EXPECT_ELEMENTS == eo->state); |
351 | 531 | ||
352 | diff_ibf = ibf_dup (prepare_ibf (eo, eo->extra.u->ibf_order)); | 532 | prepare_ibf (eo, eo->ibf_order); |
353 | ibf_subtract (diff_ibf, eo->extra.u->ibf_received); | 533 | diff_ibf = ibf_dup (eo->local_ibf); |
534 | ibf_subtract (diff_ibf, eo->remote_ibf); | ||
354 | 535 | ||
355 | while (1) | 536 | while (1) |
356 | { | 537 | { |
@@ -359,7 +540,8 @@ decode (struct EvaluateOperation *eo) | |||
359 | res = ibf_decode (diff_ibf, &side, &key); | 540 | res = ibf_decode (diff_ibf, &side, &key); |
360 | if (GNUNET_SYSERR == res) | 541 | if (GNUNET_SYSERR == res) |
361 | { | 542 | { |
362 | /* decoding failed, we tell the other peer by sending our ibf with a larger order */ | 543 | /* decoding failed, we tell the other peer by sending our ibf |
544 | * with a larger order */ | ||
363 | GNUNET_assert (0); | 545 | GNUNET_assert (0); |
364 | return; | 546 | return; |
365 | } | 547 | } |
@@ -373,20 +555,11 @@ decode (struct EvaluateOperation *eo) | |||
373 | } | 555 | } |
374 | if (1 == side) | 556 | if (1 == side) |
375 | { | 557 | { |
376 | struct ElementEntry *e; | 558 | //struct ElementEntry *e; |
377 | /* we have the element(s), send it to the other peer */ | 559 | /* we have the element(s), send it to the other peer */ |
378 | e = GNUNET_CONTAINER_multihashmap32_get (eo->set->extra.u->elements, (uint32_t) key.key_val); | 560 | //GNUNET_CONTAINER_multihashmap32_get_multiple (eo->set->state.u->elements, |
379 | if (NULL == e) | 561 | // (uint32_t) key.key_val); |
380 | { | 562 | /* FIXME */ |
381 | /* FIXME */ | ||
382 | GNUNET_assert (0); | ||
383 | return; | ||
384 | } | ||
385 | while (NULL != e) | ||
386 | { | ||
387 | /* FIXME: send element */ | ||
388 | e = e->next; | ||
389 | } | ||
390 | } | 563 | } |
391 | else | 564 | else |
392 | { | 565 | { |
@@ -403,37 +576,35 @@ decode (struct EvaluateOperation *eo) | |||
403 | } | 576 | } |
404 | 577 | ||
405 | 578 | ||
406 | |||
407 | static void | 579 | static void |
408 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | 580 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) |
409 | { | 581 | { |
410 | struct EvaluateOperation *eo = cls; | 582 | struct UnionEvaluateOperation *eo = cls; |
411 | struct UnionEvaluateOperation *ueo = eo->extra.u; | ||
412 | struct IBFMessage *msg = (struct IBFMessage *) mh; | 583 | struct IBFMessage *msg = (struct IBFMessage *) mh; |
413 | unsigned int buckets_in_message; | 584 | unsigned int buckets_in_message; |
414 | 585 | ||
415 | if (ueo->state == STATE_EXPECT_ELEMENTS_AND_REQUESTS) | 586 | if (eo->state == STATE_EXPECT_ELEMENTS_AND_REQUESTS) |
416 | { | 587 | { |
417 | /* check that the ibf is a new one / first part */ | 588 | /* check that the ibf is a new one / first part */ |
418 | /* clear outgoing messages */ | 589 | /* clear outgoing messages */ |
419 | GNUNET_assert (0); | 590 | GNUNET_assert (0); |
420 | } | 591 | } |
421 | else if (ueo->state == STATE_EXPECT_IBF) | 592 | else if (eo->state == STATE_EXPECT_IBF) |
422 | { | 593 | { |
423 | ueo->state = STATE_EXPECT_IBF_CONT; | 594 | eo->state = STATE_EXPECT_IBF_CONT; |
424 | ueo->ibf_order = msg->order; | 595 | eo->ibf_order = msg->order; |
425 | GNUNET_assert (NULL == ueo->ibf_received); | 596 | GNUNET_assert (NULL == eo->remote_ibf); |
426 | ueo->ibf_received = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | 597 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); |
427 | if (ntohs (msg->offset) != 0) | 598 | if (ntohs (msg->offset) != 0) |
428 | { | 599 | { |
429 | /* FIXME: handle */ | 600 | /* FIXME: handle */ |
430 | GNUNET_assert (0); | 601 | GNUNET_assert (0); |
431 | } | 602 | } |
432 | } | 603 | } |
433 | else if (ueo->state == STATE_EXPECT_IBF_CONT) | 604 | else if (eo->state == STATE_EXPECT_IBF_CONT) |
434 | { | 605 | { |
435 | if ( (ntohs (msg->offset) != ueo->ibf_buckets_received) || | 606 | if ( (ntohs (msg->offset) != eo->ibf_buckets_received) || |
436 | (msg->order != ueo->ibf_order) ) | 607 | (msg->order != eo->ibf_order) ) |
437 | { | 608 | { |
438 | /* FIXME: handle */ | 609 | /* FIXME: handle */ |
439 | GNUNET_assert (0); | 610 | GNUNET_assert (0); |
@@ -448,12 +619,12 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
448 | GNUNET_assert (0); | 619 | GNUNET_assert (0); |
449 | } | 620 | } |
450 | 621 | ||
451 | ibf_read_slice (&msg[1], ueo->ibf_buckets_received, buckets_in_message, ueo->ibf_received); | 622 | ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); |
452 | ueo->ibf_buckets_received += buckets_in_message; | 623 | eo->ibf_buckets_received += buckets_in_message; |
453 | 624 | ||
454 | if (ueo->ibf_buckets_received == (1<<ueo->ibf_order)) | 625 | if (eo->ibf_buckets_received == (1<<eo->ibf_order)) |
455 | { | 626 | { |
456 | ueo->state = STATE_EXPECT_ELEMENTS; | 627 | eo->state = STATE_EXPECT_ELEMENTS; |
457 | decode (eo); | 628 | decode (eo); |
458 | } | 629 | } |
459 | } | 630 | } |
@@ -462,10 +633,10 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
462 | static void | 633 | static void |
463 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | 634 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) |
464 | { | 635 | { |
465 | struct EvaluateOperation *eo = cls; | 636 | struct UnionEvaluateOperation *eo = cls; |
466 | 637 | ||
467 | if ( (eo->extra.u->state != STATE_EXPECT_ELEMENTS) && | 638 | if ( (eo->state != STATE_EXPECT_ELEMENTS) && |
468 | (eo->extra.u->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) ) | 639 | (eo->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) ) |
469 | { | 640 | { |
470 | /* FIXME: handle */ | 641 | /* FIXME: handle */ |
471 | GNUNET_break (0); | 642 | GNUNET_break (0); |
@@ -477,10 +648,10 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
477 | static void | 648 | static void |
478 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | 649 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) |
479 | { | 650 | { |
480 | struct EvaluateOperation *eo = cls; | 651 | struct UnionEvaluateOperation *eo = cls; |
481 | 652 | ||
482 | /* look up elements and send them */ | 653 | /* look up elements and send them */ |
483 | if (eo->extra.u->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) | 654 | if (eo->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) |
484 | { | 655 | { |
485 | /* FIXME: handle */ | 656 | /* FIXME: handle */ |
486 | GNUNET_break (0); | 657 | GNUNET_break (0); |
@@ -508,141 +679,107 @@ static const struct GNUNET_MQ_Handler union_handlers[] = { | |||
508 | 679 | ||
509 | /** | 680 | /** |
510 | * Functions of this type will be called when a stream is established | 681 | * Functions of this type will be called when a stream is established |
511 | * | 682 | * |
512 | * @param cls the closure from GNUNET_STREAM_open | 683 | * @param cls the closure from GNUNET_STREAM_open |
513 | * @param socket socket to use to communicate with the other side (read/write) | 684 | * @param socket socket to use to communicate with the |
685 | * other side (read/write) | ||
514 | */ | 686 | */ |
515 | static void | 687 | static void |
516 | stream_open_cb (void *cls, | 688 | stream_open_cb (void *cls, |
517 | struct GNUNET_STREAM_Socket *socket) | 689 | struct GNUNET_STREAM_Socket *socket) |
518 | { | 690 | { |
519 | struct EvaluateOperation *eo = cls; | 691 | struct UnionEvaluateOperation *eo = cls; |
520 | 692 | ||
521 | GNUNET_assert (NULL == eo->mq); | 693 | GNUNET_assert (NULL == eo->mq); |
522 | GNUNET_assert (socket == eo->socket); | 694 | GNUNET_assert (socket == eo->socket); |
523 | 695 | ||
524 | eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket, union_handlers, eo); | 696 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n"); |
697 | |||
698 | eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket, | ||
699 | union_handlers, eo); | ||
700 | /* we started the operation, thus we have to send the operation request */ | ||
525 | send_operation_request (eo); | 701 | send_operation_request (eo); |
702 | eo->state = STATE_EXPECT_SE; | ||
526 | } | 703 | } |
527 | 704 | ||
528 | 705 | ||
529 | void | 706 | void |
530 | union_evaluate (struct EvaluateOperation *eo) | 707 | _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) |
531 | { | 708 | { |
532 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == eo->set->operation); | 709 | struct UnionEvaluateOperation *eo; |
533 | eo->socket = | ||
534 | GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, | ||
535 | stream_open_cb, GNUNET_STREAM_OPTION_END); | ||
536 | } | ||
537 | |||
538 | |||
539 | static void | ||
540 | insert_ibf_key_unchecked (struct UnionState *us, struct IBF_Key ibf_key) | ||
541 | { | ||
542 | int i; | ||
543 | |||
544 | strata_estimator_insert (us->se, ibf_key); | ||
545 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
546 | { | ||
547 | if (NULL == us->ibfs) | ||
548 | break; | ||
549 | if (NULL == us->ibfs[i]) | ||
550 | continue; | ||
551 | ibf_insert (us->ibfs[i], ibf_key); | ||
552 | } | ||
553 | } | ||
554 | 710 | ||
711 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n"); | ||
555 | 712 | ||
556 | /** | 713 | eo = GNUNET_new (struct UnionEvaluateOperation); |
557 | * Insert an element into the consensus set of the specified session. | 714 | eo->peer = m->peer; |
558 | * The element will not be copied, and freed when destroying the session. | 715 | eo->set = set; |
559 | * | 716 | eo->socket = |
560 | * @param session session for new element | 717 | GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, |
561 | * @param element element to insert | 718 | stream_open_cb, eo, |
562 | */ | 719 | GNUNET_STREAM_OPTION_END); |
563 | static void | ||
564 | insert_element (struct Set *set, struct GNUNET_SET_Element *element) | ||
565 | { | ||
566 | struct UnionState *us = set->extra.u; | ||
567 | struct GNUNET_HashCode hash; | ||
568 | struct ElementEntry *e; | ||
569 | struct ElementEntry *e_old; | ||
570 | |||
571 | e = GNUNET_new (struct ElementEntry); | ||
572 | e->element = element; | ||
573 | GNUNET_CRYPTO_hash (e->element->data, e->element->size, &hash); | ||
574 | e->ibf_key = ibf_key_from_hashcode (&hash); | ||
575 | |||
576 | e_old = GNUNET_CONTAINER_multihashmap32_get (us->elements, (uint32_t) e->ibf_key.key_val); | ||
577 | if (NULL == e_old) | ||
578 | { | ||
579 | GNUNET_CONTAINER_multihashmap32_put (us->elements, (uint32_t) e->ibf_key.key_val, e, | ||
580 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
581 | return; | ||
582 | } | ||
583 | |||
584 | while (NULL != e_old) | ||
585 | { | ||
586 | int cmp = memcmp (&e->ibf_key, &e_old->ibf_key, sizeof (struct IBF_Key)); | ||
587 | if (cmp < 0) | ||
588 | { | ||
589 | if (NULL == e_old->next) | ||
590 | { | ||
591 | e_old->next = e; | ||
592 | insert_ibf_key_unchecked (us, e->ibf_key); | ||
593 | return; | ||
594 | } | ||
595 | e_old = e_old->next; | ||
596 | } | ||
597 | else if (cmp == 0) | ||
598 | { | ||
599 | e->next = e_old->next; | ||
600 | e_old->next = e; | ||
601 | return; | ||
602 | } | ||
603 | else | ||
604 | { | ||
605 | e->next = e_old; | ||
606 | insert_ibf_key_unchecked (us, e->ibf_key); | ||
607 | return; | ||
608 | } | ||
609 | } | ||
610 | } | 720 | } |
611 | 721 | ||
612 | 722 | ||
613 | void | 723 | void |
614 | union_accept (struct EvaluateOperation *eo, struct Incoming *incoming) | 724 | _GSS_union_accept (struct AcceptMessage *m, struct Set *set, |
725 | struct Incoming *incoming) | ||
615 | { | 726 | { |
616 | GNUNET_assert (NULL != incoming->mq); | 727 | struct UnionEvaluateOperation *eo; |
728 | |||
729 | eo = GNUNET_new (struct UnionEvaluateOperation); | ||
730 | eo->set = set; | ||
731 | eo->peer = incoming->peer; | ||
732 | eo->app_id = incoming->app_id; | ||
733 | eo->salt = ntohs (incoming->salt); | ||
734 | eo->request_id = m->request_id; | ||
735 | eo->set = set; | ||
617 | eo->mq = incoming->mq; | 736 | eo->mq = incoming->mq; |
737 | /* the peer's socket is now ours, we'll receive all messages */ | ||
618 | GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); | 738 | GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); |
619 | 739 | /* kick of the operation */ | |
620 | send_strata_estimator (eo); | 740 | send_strata_estimator (eo); |
621 | } | 741 | } |
622 | 742 | ||
623 | 743 | ||
624 | struct Set * | 744 | struct Set * |
625 | union_set_create () | 745 | _GSS_union_set_create (void) |
626 | { | 746 | { |
627 | struct Set *set; | 747 | struct Set *set; |
748 | |||
749 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n"); | ||
750 | |||
628 | set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); | 751 | set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); |
629 | set->extra.u = (struct UnionState *) &set[1]; | 752 | set->state.u = (struct UnionState *) &set[1]; |
630 | set->operation = GNUNET_SET_OPERATION_UNION; | 753 | set->operation = GNUNET_SET_OPERATION_UNION; |
631 | set->extra.u->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); | 754 | set->state.u->se = strata_estimator_create (SE_STRATA_COUNT, |
755 | SE_IBF_SIZE, SE_IBF_HASH_NUM); | ||
632 | return set; | 756 | return set; |
633 | } | 757 | } |
634 | 758 | ||
635 | 759 | ||
760 | |||
636 | void | 761 | void |
637 | union_add (struct Set *set, struct ElementMessage *m) | 762 | _GSS_union_add (struct ElementMessage *m, struct Set *set) |
638 | { | 763 | { |
639 | struct GNUNET_SET_Element *element; | 764 | struct ElementEntry *ee; |
765 | struct ElementEntry *ee_dup; | ||
640 | uint16_t element_size; | 766 | uint16_t element_size; |
767 | |||
641 | element_size = ntohs (m->header.size) - sizeof *m; | 768 | element_size = ntohs (m->header.size) - sizeof *m; |
642 | element = GNUNET_malloc (sizeof *element + element_size); | 769 | ee = GNUNET_malloc (element_size + sizeof *ee); |
643 | element->size = element_size; | 770 | ee->element.size = element_size; |
644 | element->data = &element[1]; | 771 | ee->element.data = &ee[1]; |
645 | memcpy (element->data, &m[1], element_size); | 772 | memcpy (ee->element.data, &m[1], element_size); |
646 | insert_element (set, element); | 773 | GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash); |
774 | ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash); | ||
775 | if (NULL != ee_dup) | ||
776 | { | ||
777 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n"); | ||
778 | GNUNET_free (ee); | ||
779 | return; | ||
780 | } | ||
781 | GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee, | ||
782 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
783 | strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0)); | ||
647 | } | 784 | } |
648 | 785 | ||
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c index 7de687611..c49b60dfd 100644 --- a/src/set/gnunet-set.c +++ b/src/set/gnunet-set.c | |||
@@ -30,6 +30,8 @@ | |||
30 | #include "gnunet_set_service.h" | 30 | #include "gnunet_set_service.h" |
31 | 31 | ||
32 | 32 | ||
33 | static struct GNUNET_PeerIdentity local_id; | ||
34 | |||
33 | static struct GNUNET_HashCode app_id; | 35 | static struct GNUNET_HashCode app_id; |
34 | static struct GNUNET_SET_Handle *set1; | 36 | static struct GNUNET_SET_Handle *set1; |
35 | static struct GNUNET_SET_Handle *set2; | 37 | static struct GNUNET_SET_Handle *set2; |
@@ -45,6 +47,13 @@ listen_cb (void *cls, | |||
45 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); | 47 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); |
46 | } | 48 | } |
47 | 49 | ||
50 | static void | ||
51 | result_cb (void *cls, struct GNUNET_SET_Element *element, | ||
52 | enum GNUNET_SET_Status status) | ||
53 | { | ||
54 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result\n"); | ||
55 | } | ||
56 | |||
48 | 57 | ||
49 | /** | 58 | /** |
50 | * Main function that will be run. | 59 | * Main function that will be run. |
@@ -60,12 +69,19 @@ run (void *cls, char *const *args, | |||
60 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 69 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
61 | { | 70 | { |
62 | static const char* app_str = "gnunet-set"; | 71 | static const char* app_str = "gnunet-set"; |
72 | |||
63 | GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); | 73 | GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); |
64 | 74 | ||
75 | GNUNET_CRYPTO_get_host_identity (cfg, &local_id); | ||
76 | |||
65 | set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | 77 | set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); |
66 | set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | 78 | set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); |
67 | listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, &app_id, | 79 | listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, |
68 | listen_cb, NULL); | 80 | &app_id, listen_cb, NULL); |
81 | |||
82 | GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, | ||
83 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED, | ||
84 | result_cb, NULL); | ||
69 | } | 85 | } |
70 | 86 | ||
71 | 87 | ||
diff --git a/src/set/mq.c b/src/set/mq.c index 236a692d4..92120a607 100644 --- a/src/set/mq.c +++ b/src/set/mq.c | |||
@@ -215,21 +215,25 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
215 | } | 215 | } |
216 | 216 | ||
217 | 217 | ||
218 | struct GNUNET_MQ_Message * | 218 | int |
219 | GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type) | 219 | GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, |
220 | const struct GNUNET_MessageHeader *m) | ||
220 | { | 221 | { |
221 | struct GNUNET_MQ_Message *mq; | 222 | size_t new_size; |
223 | size_t old_size; | ||
222 | 224 | ||
223 | GNUNET_assert (NULL != mhp); | ||
224 | if (NULL == m) | 225 | if (NULL == m) |
225 | return GNUNET_MQ_msg_ (mhp, base_size, type); | 226 | return GNUNET_OK; |
226 | GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader))); | 227 | GNUNET_assert (NULL != mqmp); |
227 | /* check for overflow */ | 228 | old_size = ntohs ((*mqmp)->mh->size); |
228 | if (base_size + ntohs (m->size) <= base_size) | 229 | /* message too large to concatenate? */ |
229 | return NULL; | 230 | if (ntohs ((*mqmp)->mh->size) + ntohs (m->size) < ntohs (m->size)) |
230 | mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type); | 231 | return GNUNET_SYSERR; |
231 | memcpy (((void *) *mhp) + base_size, m, ntohs (m->size)); | 232 | new_size = old_size + ntohs (m->size); |
232 | return mq; | 233 | *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); |
234 | memcpy ((*mqmp)->mh + old_size, m, new_size - old_size); | ||
235 | (*mqmp)->mh->size = htons (new_size); | ||
236 | return GNUNET_OK; | ||
233 | } | 237 | } |
234 | 238 | ||
235 | 239 | ||
@@ -274,20 +278,26 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
274 | return; | 278 | return; |
275 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); | 279 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); |
276 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | 280 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), |
277 | GNUNET_TIME_UNIT_FOREVER_REL, stream_write_queued, cls); | 281 | GNUNET_TIME_UNIT_FOREVER_REL, |
282 | stream_write_queued, mq); | ||
278 | GNUNET_assert (NULL != mss->wh); | 283 | GNUNET_assert (NULL != mss->wh); |
279 | } | 284 | } |
280 | 285 | ||
281 | 286 | ||
282 | static void | 287 | static void |
283 | stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 288 | stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, |
289 | struct GNUNET_MQ_Message *mqm) | ||
284 | { | 290 | { |
291 | struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state; | ||
285 | if (NULL != mq->current_msg) | 292 | if (NULL != mq->current_msg) |
286 | { | 293 | { |
287 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | 294 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); |
288 | return; | 295 | return; |
289 | } | 296 | } |
290 | stream_write_queued (mq, GNUNET_STREAM_OK, 0); | 297 | mq->current_msg = mqm; |
298 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
299 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
300 | stream_write_queued, mq); | ||
291 | } | 301 | } |
292 | 302 | ||
293 | 303 | ||
@@ -304,7 +314,8 @@ stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes | |||
304 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | 314 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing |
305 | */ | 315 | */ |
306 | static int | 316 | static int |
307 | stream_mst_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | 317 | stream_mst_callback (void *cls, void *client, |
318 | const struct GNUNET_MessageHeader *message) | ||
308 | { | 319 | { |
309 | struct GNUNET_MQ_MessageQueue *mq = cls; | 320 | struct GNUNET_MQ_MessageQueue *mq = cls; |
310 | 321 | ||
@@ -334,12 +345,14 @@ stream_data_processor (void *cls, | |||
334 | struct GNUNET_MQ_MessageQueue *mq = cls; | 345 | struct GNUNET_MQ_MessageQueue *mq = cls; |
335 | struct MessageStreamState *mss; | 346 | struct MessageStreamState *mss; |
336 | int ret; | 347 | int ret; |
348 | |||
337 | mss = (struct MessageStreamState *) mq->impl_state; | 349 | mss = (struct MessageStreamState *) mq->impl_state; |
338 | |||
339 | GNUNET_assert (GNUNET_STREAM_OK == status); | 350 | GNUNET_assert (GNUNET_STREAM_OK == status); |
340 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | 351 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); |
341 | GNUNET_assert (GNUNET_OK == ret); | 352 | GNUNET_assert (GNUNET_OK == ret); |
342 | /* we always read all data */ | 353 | /* we always read all data */ |
354 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
355 | stream_data_processor, mq); | ||
343 | return size; | 356 | return size; |
344 | } | 357 | } |
345 | 358 | ||
@@ -369,8 +382,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, | |||
369 | } | 382 | } |
370 | 383 | ||
371 | 384 | ||
372 | /** | 385 | /*** Transmit a queued message to the session's client. |
373 | * Transmit a queued message to the session's client. | ||
374 | * | 386 | * |
375 | * @param cls consensus session | 387 | * @param cls consensus session |
376 | * @param size number of bytes available in buf | 388 | * @param size number of bytes available in buf |
@@ -474,7 +486,7 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
474 | mq->current_msg = mq->msg_head; | 486 | mq->current_msg = mq->msg_head; |
475 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | 487 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); |
476 | state->th = | 488 | state->th = |
477 | GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size, | 489 | GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size), |
478 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | 490 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, |
479 | &connection_client_transmit_queued, mq); | 491 | &connection_client_transmit_queued, mq); |
480 | } | 492 | } |
@@ -483,7 +495,8 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
483 | 495 | ||
484 | 496 | ||
485 | static void | 497 | static void |
486 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 498 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, |
499 | struct GNUNET_MQ_Message *mqm) | ||
487 | { | 500 | { |
488 | struct ClientConnectionState *state = mq->impl_state; | 501 | struct ClientConnectionState *state = mq->impl_state; |
489 | int msize; | 502 | int msize; |
@@ -519,7 +532,6 @@ handle_client_message (void *cls, | |||
519 | struct GNUNET_MQ_MessageQueue *mq = cls; | 532 | struct GNUNET_MQ_MessageQueue *mq = cls; |
520 | 533 | ||
521 | GNUNET_assert (NULL != msg); | 534 | GNUNET_assert (NULL != msg); |
522 | |||
523 | dispatch_message (mq, msg); | 535 | dispatch_message (mq, msg); |
524 | } | 536 | } |
525 | 537 | ||
diff --git a/src/set/mq.h b/src/set/mq.h index 3d8be789d..371bb5846 100644 --- a/src/set/mq.h +++ b/src/set/mq.h | |||
@@ -58,20 +58,7 @@ | |||
58 | */ | 58 | */ |
59 | #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) | 59 | #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) |
60 | 60 | ||
61 | /** | 61 | #define GNUNET_MQ_nest(mqm, mh) GNUNET_MQ_nest_ (&mqm, mh) |
62 | * Allocate a GNUNET_MQ_Message, and concatenate another message | ||
63 | * after the space needed by the message struct. | ||
64 | * // nest? | ||
65 | * | ||
66 | * @param mvar variable to store the allocated message in; | ||
67 | * must have a header field | ||
68 | * @param mc message to concatenate, can be NULL | ||
69 | * @param type type of the message | ||
70 | * @return the MQ message, NULL if mc is to large to be concatenated | ||
71 | */ | ||
72 | #define GNUNET_MQ_msg_concat(mvar, mc, t) GNUNET_MQ_msg_concat_(((void) mvar->header, (struct GNUNET_MessageHeader **) &(mvar)), \ | ||
73 | sizeof *mvar, (struct GNUNET_MessageHeader *) mc, t) | ||
74 | |||
75 | 62 | ||
76 | /** | 63 | /** |
77 | * Allocate a GNUNET_MQ_Message, where the message only consists of a header. | 64 | * Allocate a GNUNET_MQ_Message, where the message only consists of a header. |
@@ -157,14 +144,9 @@ struct GNUNET_MQ_Message * | |||
157 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); | 144 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); |
158 | 145 | ||
159 | 146 | ||
160 | /** | 147 | int |
161 | * Create a new message for MQ, by concatenating another message | 148 | GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, |
162 | * after a message of the specified type. | 149 | const struct GNUNET_MessageHeader *m); |
163 | * | ||
164 | * @retrn the allocated MQ message | ||
165 | */ | ||
166 | struct GNUNET_MQ_Message * | ||
167 | GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type); | ||
168 | 150 | ||
169 | 151 | ||
170 | /** | 152 | /** |
diff --git a/src/set/set_api.c b/src/set/set_api.c index b2491afe7..daa15c081 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -288,6 +288,7 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
288 | const struct GNUNET_PeerIdentity *other_peer, | 288 | const struct GNUNET_PeerIdentity *other_peer, |
289 | const struct GNUNET_HashCode *app_id, | 289 | const struct GNUNET_HashCode *app_id, |
290 | const struct GNUNET_MessageHeader *context_msg, | 290 | const struct GNUNET_MessageHeader *context_msg, |
291 | uint16_t salt, | ||
291 | struct GNUNET_TIME_Relative timeout, | 292 | struct GNUNET_TIME_Relative timeout, |
292 | enum GNUNET_SET_ResultMode result_mode, | 293 | enum GNUNET_SET_ResultMode result_mode, |
293 | GNUNET_SET_ResultIterator result_cb, | 294 | GNUNET_SET_ResultIterator result_cb, |
@@ -302,11 +303,14 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
302 | oh->result_cls = result_cls; | 303 | oh->result_cls = result_cls; |
303 | oh->set = set; | 304 | oh->set = set; |
304 | 305 | ||
305 | mqm = GNUNET_MQ_msg_extra (msg, htons(context_msg->size), GNUNET_MESSAGE_TYPE_SET_EVALUATE); | 306 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE); |
306 | msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); | 307 | msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); |
307 | msg->peer = *other_peer; | 308 | msg->peer = *other_peer; |
308 | msg->app_id = *app_id; | 309 | msg->app_id = *app_id; |
309 | memcpy (&msg[1], context_msg, htons (context_msg->size)); | 310 | |
311 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg)) | ||
312 | GNUNET_assert (0); | ||
313 | |||
310 | oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); | 314 | oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); |
311 | GNUNET_MQ_send (set->mq, mqm); | 315 | GNUNET_MQ_send (set->mq, mqm); |
312 | 316 | ||