diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-06-03 10:53:49 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-06-03 10:53:49 +0000 |
commit | 68403fa780bf94ace2ebc13c2c09463cbbc0b57c (patch) | |
tree | 3442e4f25de90eab67c4f9813cb6e433c50b7482 | |
parent | fae7f583f2e11cac15fefcbefef64287ab6915d3 (diff) | |
download | gnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.tar.gz gnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.zip |
- conclude for SET
- consensus with SET
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/consensus/Makefile.am | 4 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 6 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 11 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 6 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 407 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 8 | ||||
-rw-r--r-- | src/dv/gnunet-service-dv.c | 4 | ||||
-rw-r--r-- | src/include/gnunet_consensus_service.h | 27 | ||||
-rw-r--r-- | src/include/gnunet_mq_lib.h | 89 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 34 | ||||
-rw-r--r-- | src/include/gnunet_set_service.h | 27 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 166 | ||||
-rw-r--r-- | src/set/gnunet-service-set.h | 8 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 116 | ||||
-rw-r--r-- | src/set/gnunet-set.c | 14 | ||||
-rw-r--r-- | src/set/set.h | 62 | ||||
-rw-r--r-- | src/set/set_api.c | 156 | ||||
-rw-r--r-- | src/set/test_set.conf | 2 | ||||
-rw-r--r-- | src/set/test_set_api.c | 23 | ||||
-rw-r--r-- | src/stream/stream_api.c | 69 | ||||
-rw-r--r-- | src/util/crypto_hash.c | 2 | ||||
-rw-r--r-- | src/util/mq.c | 81 | ||||
-rw-r--r-- | src/util/test_mq.c | 30 |
24 files changed, 773 insertions, 581 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 6819ec90b..2f4397dbe 100644 --- a/src/Makefile.am +++ b/src/Makefile.am | |||
@@ -3,7 +3,7 @@ | |||
3 | #endif | 3 | #endif |
4 | 4 | ||
5 | if HAVE_EXPERIMENTAL | 5 | if HAVE_EXPERIMENTAL |
6 | EXP_DIR = gns consensus dv set experimentation | 6 | EXP_DIR = gns set dv consensus experimentation |
7 | endif | 7 | endif |
8 | 8 | ||
9 | if LINUX | 9 | if LINUX |
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index a0edb1d65..914fbdef8 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am | |||
@@ -61,6 +61,8 @@ gnunet_service_consensus_LDADD = \ | |||
61 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 61 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
62 | $(top_builddir)/src/set/libgnunetset.la \ | 62 | $(top_builddir)/src/set/libgnunetset.la \ |
63 | $(GN_LIBINTL) | 63 | $(GN_LIBINTL) |
64 | gnunet_service_consensus_DEPENDENCIES = \ | ||
65 | $(top_builddir)/src/set/libgnunetset.la | ||
64 | 66 | ||
65 | gnunet_service_evil_consensus_SOURCES = \ | 67 | gnunet_service_evil_consensus_SOURCES = \ |
66 | gnunet-service-consensus.c | 68 | gnunet-service-consensus.c |
@@ -71,6 +73,8 @@ gnunet_service_evil_consensus_LDADD = \ | |||
71 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 73 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
72 | $(top_builddir)/src/set/libgnunetset.la \ | 74 | $(top_builddir)/src/set/libgnunetset.la \ |
73 | $(GN_LIBINTL) | 75 | $(GN_LIBINTL) |
76 | gnunet_service_evil_consensus_DEPENDENCIES = \ | ||
77 | $(top_builddir)/src/set/libgnunetset.la | ||
74 | gnunet_service_evil_consensus_CFLAGS = -DEVIL | 78 | gnunet_service_evil_consensus_CFLAGS = -DEVIL |
75 | 79 | ||
76 | libgnunetconsensus_la_SOURCES = \ | 80 | libgnunetconsensus_la_SOURCES = \ |
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 635a610ca..e3ddb4913 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -236,9 +236,9 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) | |||
236 | */ | 236 | */ |
237 | static void | 237 | static void |
238 | handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | 238 | handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, |
239 | struct GNUNET_CONSENSUS_ElementMessage *msg) | 239 | struct GNUNET_CONSENSUS_ElementMessage *msg) |
240 | { | 240 | { |
241 | struct GNUNET_CONSENSUS_Element element; | 241 | struct GNUNET_SET_Element element; |
242 | 242 | ||
243 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); | 243 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); |
244 | 244 | ||
@@ -424,7 +424,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
424 | */ | 424 | */ |
425 | void | 425 | void |
426 | GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | 426 | GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, |
427 | const struct GNUNET_CONSENSUS_Element *element, | 427 | const struct GNUNET_SET_Element *element, |
428 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 428 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
429 | void *idc_cls) | 429 | void *idc_cls) |
430 | { | 430 | { |
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index af4d74100..4e0673c7d 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h | |||
@@ -38,12 +38,15 @@ GNUNET_NETWORK_STRUCT_BEGIN | |||
38 | /** | 38 | /** |
39 | * Sent as context message for set reconciliation. | 39 | * Sent as context message for set reconciliation. |
40 | */ | 40 | */ |
41 | struct ConsensusRoundMessage | 41 | struct GNUNET_CONSENSUS_RoundContextMessage |
42 | { | 42 | { |
43 | /** | ||
44 | * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT | ||
45 | */ | ||
43 | struct GNUNET_MessageHeader header; | 46 | struct GNUNET_MessageHeader header; |
44 | uint8_t round; | 47 | uint32_t round; |
45 | uint8_t exp_round; | 48 | uint32_t exp_round; |
46 | uint8_t exp_subround; | 49 | uint32_t exp_subround; |
47 | }; | 50 | }; |
48 | 51 | ||
49 | GNUNET_NETWORK_STRUCT_END | 52 | GNUNET_NETWORK_STRUCT_END |
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index d8c1b14ee..60db8c61a 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -133,7 +133,7 @@ do_consensus () | |||
133 | { | 133 | { |
134 | int j; | 134 | int j; |
135 | struct GNUNET_HashCode *val; | 135 | struct GNUNET_HashCode *val; |
136 | struct GNUNET_CONSENSUS_Element *element; | 136 | struct GNUNET_SET_Element *element; |
137 | generate_indices(unique_indices); | 137 | generate_indices(unique_indices); |
138 | 138 | ||
139 | val = GNUNET_malloc (sizeof *val); | 139 | val = GNUNET_malloc (sizeof *val); |
@@ -151,6 +151,8 @@ do_consensus () | |||
151 | } | 151 | } |
152 | } | 152 | } |
153 | 153 | ||
154 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "all elements inserted, calling conclude\n"); | ||
155 | |||
154 | for (i = 0; i < num_peers; i++) | 156 | for (i = 0; i < num_peers; i++) |
155 | GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]); | 157 | GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]); |
156 | } | 158 | } |
@@ -194,7 +196,7 @@ connect_complete (void *cls, | |||
194 | 196 | ||
195 | static void | 197 | static void |
196 | new_element_cb (void *cls, | 198 | new_element_cb (void *cls, |
197 | const struct GNUNET_CONSENSUS_Element *element) | 199 | const struct GNUNET_SET_Element *element) |
198 | { | 200 | { |
199 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); | 201 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); |
200 | } | 202 | } |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 44edeb215..21123b24f 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet | 2 | This file is part of GNUnet |
3 | (C) 2012 Christian Grothoff (and other contributing authors) | 3 | (C) 2012, 2013 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -43,9 +43,9 @@ | |||
43 | 43 | ||
44 | 44 | ||
45 | /** | 45 | /** |
46 | * Number of exponential rounds, used in the inventory and completion round. | 46 | * Number of exponential rounds, used in the exp and completion round. |
47 | */ | 47 | */ |
48 | #define NUM_EXP_ROUNDS (4) | 48 | #define NUM_EXP_ROUNDS 4 |
49 | 49 | ||
50 | /* forward declarations */ | 50 | /* forward declarations */ |
51 | 51 | ||
@@ -155,17 +155,17 @@ struct ConsensusSession | |||
155 | * Permutation of peers for the current round, | 155 | * Permutation of peers for the current round, |
156 | * maps logical index (for current round) to physical index (location in info array) | 156 | * maps logical index (for current round) to physical index (location in info array) |
157 | */ | 157 | */ |
158 | int *shuffle; | 158 | uint32_t *shuffle; |
159 | 159 | ||
160 | /** | 160 | /** |
161 | * Current round of the exponential scheme. | 161 | * Current round of the exponential scheme. |
162 | */ | 162 | */ |
163 | int exp_round; | 163 | uint32_t exp_round; |
164 | 164 | ||
165 | /** | 165 | /** |
166 | * Current sub-round of the exponential scheme. | 166 | * Current sub-round of the exponential scheme. |
167 | */ | 167 | */ |
168 | int exp_subround; | 168 | uint32_t exp_subround; |
169 | 169 | ||
170 | /** | 170 | /** |
171 | * The partner for the current exp-round | 171 | * The partner for the current exp-round |
@@ -201,17 +201,6 @@ struct ConsensusPeerInformation | |||
201 | struct GNUNET_PeerIdentity peer_id; | 201 | struct GNUNET_PeerIdentity peer_id; |
202 | 202 | ||
203 | /** | 203 | /** |
204 | * Do we connect to the peer, or does the peer connect to us? | ||
205 | * Only valid for all-to-all phases | ||
206 | */ | ||
207 | int is_outgoing; | ||
208 | |||
209 | /** | ||
210 | * Did we receive/send a consensus hello? | ||
211 | */ | ||
212 | int hello; | ||
213 | |||
214 | /** | ||
215 | * Back-reference to the consensus session, | 204 | * Back-reference to the consensus session, |
216 | * to that ConsensusPeerInformation can be used as a closure | 205 | * to that ConsensusPeerInformation can be used as a closure |
217 | */ | 206 | */ |
@@ -223,22 +212,14 @@ struct ConsensusPeerInformation | |||
223 | int exp_subround_finished; | 212 | int exp_subround_finished; |
224 | 213 | ||
225 | /** | 214 | /** |
226 | * GNUNET_YES if we synced inventory with this peer; | 215 | * Set operation we are currently executing with this peer. |
227 | * GNUNET_NO otherwise. | ||
228 | */ | ||
229 | int inventory_synced; | ||
230 | |||
231 | /** | ||
232 | * Round this peer seems to be in, according to the last SE we got. | ||
233 | * Necessary to store this, as we sometimes need to respond to a request from an | ||
234 | * older round, while we are already in the next round. | ||
235 | */ | 216 | */ |
236 | enum ConsensusRound apparent_round; | 217 | struct GNUNET_SET_OperationHandle *set_op; |
237 | 218 | ||
238 | /** | 219 | /** |
239 | * Set operation we are currently executing with this peer. | 220 | * Has conclude been called on the set_op? |
240 | */ | 221 | */ |
241 | struct GNUNET_SET_OperationHandle *set_op; | 222 | int set_op_concluded; |
242 | }; | 223 | }; |
243 | 224 | ||
244 | 225 | ||
@@ -268,9 +249,8 @@ static struct GNUNET_SERVER_Handle *srv; | |||
268 | static struct GNUNET_PeerIdentity my_peer; | 249 | static struct GNUNET_PeerIdentity my_peer; |
269 | 250 | ||
270 | 251 | ||
271 | /* | ||
272 | static int | 252 | static int |
273 | exp_subround_finished (const struct ConsensusSession *session) | 253 | have_exp_subround_finished (const struct ConsensusSession *session) |
274 | { | 254 | { |
275 | int not_finished; | 255 | int not_finished; |
276 | not_finished = 0; | 256 | not_finished = 0; |
@@ -284,25 +264,9 @@ exp_subround_finished (const struct ConsensusSession *session) | |||
284 | return GNUNET_YES; | 264 | return GNUNET_YES; |
285 | return GNUNET_NO; | 265 | return GNUNET_NO; |
286 | } | 266 | } |
287 | */ | ||
288 | 267 | ||
289 | 268 | ||
290 | 269 | ||
291 | /* | ||
292 | static int | ||
293 | inventory_round_finished (struct ConsensusSession *session) | ||
294 | { | ||
295 | int i; | ||
296 | int finished; | ||
297 | finished = 0; | ||
298 | for (i = 0; i < session->num_peers; i++) | ||
299 | if (GNUNET_YES == session->info[i].inventory_synced) | ||
300 | finished++; | ||
301 | if (finished >= (session->num_peers / 2)) | ||
302 | return GNUNET_YES; | ||
303 | return GNUNET_NO; | ||
304 | } | ||
305 | */ | ||
306 | 270 | ||
307 | /** | 271 | /** |
308 | * Destroy a session, free all resources associated with it. | 272 | * Destroy a session, free all resources associated with it. |
@@ -341,39 +305,6 @@ destroy_session (struct ConsensusSession *session) | |||
341 | 305 | ||
342 | 306 | ||
343 | /** | 307 | /** |
344 | * Start the inventory round, contact all peers we are supposed to contact. | ||
345 | * | ||
346 | * @param session the current session | ||
347 | */ | ||
348 | static void | ||
349 | start_inventory (struct ConsensusSession *session) | ||
350 | { | ||
351 | int i; | ||
352 | int last; | ||
353 | |||
354 | for (i = 0; i < session->num_peers; i++) | ||
355 | session->info[i].is_outgoing = GNUNET_NO; | ||
356 | |||
357 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | ||
358 | i = (session->local_peer_idx + 1) % session->num_peers; | ||
359 | while (i != last) | ||
360 | { | ||
361 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); | ||
362 | session->info[i].is_outgoing = GNUNET_YES; | ||
363 | // embrace_peer (&session->info[i]); | ||
364 | i = (i + 1) % session->num_peers; | ||
365 | } | ||
366 | // tie-breaker for even number of peers | ||
367 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | ||
368 | { | ||
369 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); | ||
370 | session->info[last].is_outgoing = GNUNET_YES; | ||
371 | // embrace_peer (&session->info[last]); | ||
372 | } | ||
373 | } | ||
374 | |||
375 | |||
376 | /** | ||
377 | * Start the next round. | 308 | * Start the next round. |
378 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | 309 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). |
379 | * | 310 | * |
@@ -407,27 +338,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
407 | subround_over (session, NULL); | 338 | subround_over (session, NULL); |
408 | break; | 339 | break; |
409 | case CONSENSUS_ROUND_EXCHANGE: | 340 | case CONSENSUS_ROUND_EXCHANGE: |
410 | /* handle two peers specially */ | 341 | /* FIXME: send all elements to client */ |
411 | if (session->num_peers <= 2) | ||
412 | { | ||
413 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); | ||
414 | //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); | ||
415 | //send_client_conclude_done (session); | ||
416 | session->current_round = CONSENSUS_ROUND_FINISH; | ||
417 | return; | ||
418 | } | ||
419 | session->current_round = CONSENSUS_ROUND_INVENTORY; | ||
420 | start_inventory (session); | ||
421 | break; | ||
422 | case CONSENSUS_ROUND_INVENTORY: | ||
423 | session->current_round = CONSENSUS_ROUND_COMPLETION; | ||
424 | session->exp_round = 0; | ||
425 | subround_over (session, NULL); | ||
426 | break; | ||
427 | case CONSENSUS_ROUND_COMPLETION: | ||
428 | session->current_round = CONSENSUS_ROUND_FINISH; | 342 | session->current_round = CONSENSUS_ROUND_FINISH; |
429 | //send_client_conclude_done (session); | ||
430 | break; | ||
431 | default: | 343 | default: |
432 | GNUNET_assert (0); | 344 | GNUNET_assert (0); |
433 | } | 345 | } |
@@ -435,68 +347,91 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
435 | 347 | ||
436 | 348 | ||
437 | /** | 349 | /** |
438 | * Adapt the shuffle of the session for the current round. | 350 | * Create a new permutation for the session's peers in session->shuffle. |
351 | * Uses a Fisher-Yates shuffle with pseudo-randomness coming from | ||
352 | * both the global session id and the current round index. | ||
353 | * | ||
354 | * @param session the session to create the new permutation for | ||
439 | */ | 355 | */ |
440 | static void | 356 | static void |
441 | shuffle (struct ConsensusSession *session) | 357 | shuffle (struct ConsensusSession *session) |
442 | { | 358 | { |
443 | /* adapted from random_permute in util/crypto_random.c */ | 359 | uint32_t i; |
444 | /* FIXME | 360 | uint32_t randomness[session->num_peers-1]; |
445 | unsigned int *ret; | 361 | |
446 | unsigned int i; | 362 | if (NULL == session->shuffle) |
447 | unsigned int tmp; | 363 | session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle)); |
448 | uint32_t x; | 364 | |
365 | GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), &session->exp_round, sizeof (uint32_t), | ||
366 | &session->global_id, sizeof (struct GNUNET_HashCode)); | ||
367 | |||
368 | for (i = 0; i < session->num_peers; i++) | ||
369 | session->shuffle[i] = i; | ||
449 | 370 | ||
450 | GNUNET_assert (n > 0); | 371 | for (i = session->num_peers - 1; i > 0; i--) |
451 | ret = GNUNET_malloc (n * sizeof (unsigned int)); | ||
452 | for (i = 0; i < n; i++) | ||
453 | ret[i] = i; | ||
454 | for (i = n - 1; i > 0; i--) | ||
455 | { | 372 | { |
456 | x = GNUNET_CRYPTO_random_u32 (mode, i + 1); | 373 | uint32_t x; |
457 | tmp = ret[x]; | 374 | uint32_t tmp; |
458 | ret[x] = ret[i]; | 375 | x = randomness[i-1]; |
459 | ret[i] = tmp; | 376 | tmp = session->shuffle[x]; |
377 | session->shuffle[x] = session->shuffle[i]; | ||
378 | session->shuffle[i] = tmp; | ||
460 | } | 379 | } |
461 | */ | ||
462 | } | 380 | } |
463 | 381 | ||
464 | 382 | ||
465 | /** | 383 | /** |
466 | * Find and set the partner_incoming and partner_outgoing of our peer, | 384 | * Find and set the partner_incoming and partner_outgoing of our peer, |
467 | * one of them may not exist in most cases. | 385 | * one of them may not exist (and thus set to NULL) if the number of peers |
386 | * in the session is not a power of two. | ||
468 | * | 387 | * |
469 | * @param session the consensus session | 388 | * @param session the consensus session |
470 | */ | 389 | */ |
471 | static void | 390 | static void |
472 | find_partners (struct ConsensusSession *session) | 391 | find_partners (struct ConsensusSession *session) |
473 | { | 392 | { |
474 | int mark[session->num_peers]; | 393 | int arc; |
475 | int i; | 394 | int partner_idx; |
476 | 395 | int largest_arc; | |
477 | memset (mark, 0, session->num_peers * sizeof (int)); | 396 | int num_ghosts; |
478 | session->partner_incoming = session->partner_outgoing = NULL; | 397 | |
479 | for (i = 0; i < session->num_peers; i++) | 398 | /* distance to neighboring peer in current subround */ |
399 | arc = 1 << session->exp_subround; | ||
400 | partner_idx = (session->local_peer_idx + arc) % session->num_peers; | ||
401 | largest_arc = 1; | ||
402 | while (largest_arc < session->num_peers) | ||
403 | largest_arc <<= 1; | ||
404 | num_ghosts = largest_arc - session->num_peers; | ||
405 | |||
406 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts); | ||
407 | |||
408 | if (0 == (session->local_peer_idx & arc)) | ||
480 | { | 409 | { |
481 | int arc; | 410 | /* we are outgoing */ |
482 | if (0 != mark[i]) | 411 | session->partner_outgoing = &session->info[session->shuffle[partner_idx]]; |
483 | continue; | 412 | /* are we a 'ghost' of a peer that would exist if |
484 | arc = (i + (1 << session->exp_subround)) % session->num_peers; | 413 | * the number of peers was a power of two, and thus have to partner |
485 | mark[i] = mark[arc] = 1; | 414 | * with an additional peer? |
486 | GNUNET_assert (i != arc); | 415 | */ |
487 | if (i == session->local_peer_idx) | 416 | if (session->local_peer_idx < num_ghosts) |
488 | { | 417 | { |
489 | GNUNET_assert (NULL == session->partner_outgoing); | 418 | int ghost_partner_idx; |
490 | session->partner_outgoing = &session->info[session->shuffle[arc]]; | 419 | ghost_partner_idx = (session->local_peer_idx - arc) % session->num_peers; |
491 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | 420 | /* platform dependent; modulo sometimes returns negative values */ |
421 | if (ghost_partner_idx < 0) | ||
422 | ghost_partner_idx += arc; | ||
423 | session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]]; | ||
492 | } | 424 | } |
493 | if (arc == session->local_peer_idx) | 425 | else |
494 | { | 426 | { |
495 | GNUNET_assert (NULL == session->partner_incoming); | 427 | session->partner_incoming = NULL; |
496 | session->partner_incoming = &session->info[session->shuffle[i]]; | ||
497 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
498 | } | 428 | } |
499 | } | 429 | } |
430 | else | ||
431 | { | ||
432 | session->partner_outgoing = NULL; | ||
433 | session->partner_incoming = &session->info[session->shuffle[partner_idx]]; | ||
434 | } | ||
500 | } | 435 | } |
501 | 436 | ||
502 | 437 | ||
@@ -508,11 +443,42 @@ find_partners (struct ConsensusSession *session) | |||
508 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK | 443 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK |
509 | * @param status see enum GNUNET_SET_Status | 444 | * @param status see enum GNUNET_SET_Status |
510 | */ | 445 | */ |
511 | static void set_result_cb (void *cls, | 446 | static void |
512 | const struct GNUNET_SET_Element *element, | 447 | set_result_cb (void *cls, |
513 | enum GNUNET_SET_Status status) | 448 | const struct GNUNET_SET_Element *element, |
449 | enum GNUNET_SET_Status status) | ||
514 | { | 450 | { |
515 | /* FIXME */ | 451 | struct ConsensusPeerInformation *cpi = cls; |
452 | |||
453 | switch (status) | ||
454 | { | ||
455 | case GNUNET_SET_STATUS_OK: | ||
456 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n"); | ||
457 | break; | ||
458 | case GNUNET_SET_STATUS_FAILURE: | ||
459 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n"); | ||
460 | break; | ||
461 | case GNUNET_SET_STATUS_HALF_DONE: | ||
462 | case GNUNET_SET_STATUS_DONE: | ||
463 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n"); | ||
464 | cpi->exp_subround_finished = GNUNET_YES; | ||
465 | if (have_exp_subround_finished (cpi->session) == GNUNET_YES) | ||
466 | subround_over (cpi->session, NULL); | ||
467 | return; | ||
468 | default: | ||
469 | GNUNET_break (0); | ||
470 | return; | ||
471 | } | ||
472 | |||
473 | switch (cpi->session->current_round) | ||
474 | { | ||
475 | case CONSENSUS_ROUND_EXCHANGE: | ||
476 | GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL); | ||
477 | break; | ||
478 | default: | ||
479 | GNUNET_break (0); | ||
480 | return; | ||
481 | } | ||
516 | } | 482 | } |
517 | 483 | ||
518 | 484 | ||
@@ -540,14 +506,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
540 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 506 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
541 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 507 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
542 | } | 508 | } |
543 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ | ||
544 | if ( (session->exp_round == NUM_EXP_ROUNDS) || | ||
545 | ((session->num_peers == 2) && (session->exp_round == 1))) | ||
546 | { | ||
547 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); | ||
548 | round_over (session, NULL); | ||
549 | return; | ||
550 | } | ||
551 | if (session->exp_round == 0) | 509 | if (session->exp_round == 0) |
552 | { | 510 | { |
553 | /* initialize everything for the log-rounds */ | 511 | /* initialize everything for the log-rounds */ |
@@ -575,18 +533,27 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
575 | 533 | ||
576 | if (NULL != session->partner_outgoing) | 534 | if (NULL != session->partner_outgoing) |
577 | { | 535 | { |
536 | struct GNUNET_CONSENSUS_RoundContextMessage *msg; | ||
537 | msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage); | ||
538 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); | ||
539 | msg->header.size = htons (sizeof *msg); | ||
540 | msg->round = htonl (session->current_round); | ||
541 | msg->exp_round = htonl (session->exp_round); | ||
542 | msg->exp_subround = htonl (session->exp_subround); | ||
543 | |||
578 | if (NULL != session->partner_outgoing->set_op) | 544 | if (NULL != session->partner_outgoing->set_op) |
545 | { | ||
579 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); | 546 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); |
547 | } | ||
580 | session->partner_outgoing->set_op = | 548 | session->partner_outgoing->set_op = |
581 | GNUNET_SET_evaluate (session->element_set, | 549 | GNUNET_SET_evaluate (&session->partner_outgoing->peer_id, |
582 | &session->partner_outgoing->peer_id, | ||
583 | &session->global_id, | 550 | &session->global_id, |
584 | NULL, /* FIXME */ | 551 | (struct GNUNET_MessageHeader *) msg, |
585 | 0, /* FIXME */ | 552 | 0, /* FIXME */ |
586 | GNUNET_SET_RESULT_ADDED, | 553 | GNUNET_SET_RESULT_ADDED, |
587 | set_result_cb, session); | 554 | set_result_cb, session->partner_outgoing); |
588 | 555 | GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set); | |
589 | 556 | session->partner_outgoing->set_op_concluded = GNUNET_YES; | |
590 | } | 557 | } |
591 | 558 | ||
592 | #ifdef GNUNET_EXTRA_LOGGING | 559 | #ifdef GNUNET_EXTRA_LOGGING |
@@ -642,6 +609,8 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod | |||
642 | int i; | 609 | int i; |
643 | struct GNUNET_HashCode tmp; | 610 | struct GNUNET_HashCode tmp; |
644 | 611 | ||
612 | /* FIXME: use kdf? */ | ||
613 | |||
645 | session->global_id = *session_id; | 614 | session->global_id = *session_id; |
646 | for (i = 0; i < session->num_peers; ++i) | 615 | for (i = 0; i < session->num_peers; ++i) |
647 | { | 616 | { |
@@ -727,9 +696,6 @@ initialize_session_peer_list (struct ConsensusSession *session, | |||
727 | } | 696 | } |
728 | 697 | ||
729 | 698 | ||
730 | |||
731 | |||
732 | |||
733 | /** | 699 | /** |
734 | * Called when another peer wants to do a set operation with the | 700 | * Called when another peer wants to do a set operation with the |
735 | * local peer. | 701 | * local peer. |
@@ -750,7 +716,67 @@ set_listen_cb (void *cls, | |||
750 | const struct GNUNET_MessageHeader *context_msg, | 716 | const struct GNUNET_MessageHeader *context_msg, |
751 | struct GNUNET_SET_Request *request) | 717 | struct GNUNET_SET_Request *request) |
752 | { | 718 | { |
753 | /* FIXME */ | 719 | struct ConsensusSession *session = cls; |
720 | struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; | ||
721 | struct ConsensusPeerInformation *cpi; | ||
722 | int index; | ||
723 | |||
724 | if (NULL == context_msg) | ||
725 | { | ||
726 | GNUNET_break_op (0); | ||
727 | return; | ||
728 | } | ||
729 | |||
730 | index = get_peer_idx (other_peer, session); | ||
731 | |||
732 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s (&other_peer->hashPubKey)); | ||
733 | |||
734 | if (index < 0) | ||
735 | { | ||
736 | GNUNET_break_op (0); | ||
737 | return; | ||
738 | } | ||
739 | |||
740 | cpi = &session->info[index]; | ||
741 | |||
742 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", session->local_peer_idx, index); | ||
743 | |||
744 | switch (session->current_round) | ||
745 | { | ||
746 | case CONSENSUS_ROUND_EXCHANGE: | ||
747 | if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE) | ||
748 | { | ||
749 | GNUNET_break_op (0); | ||
750 | return; | ||
751 | } | ||
752 | if (ntohl (msg->exp_round) < session->exp_round) | ||
753 | { | ||
754 | GNUNET_break_op (0); | ||
755 | return; | ||
756 | } | ||
757 | if (ntohl (msg->exp_subround) < session->exp_subround) | ||
758 | { | ||
759 | GNUNET_break_op (0); | ||
760 | return; | ||
761 | } | ||
762 | if (NULL != cpi->set_op) | ||
763 | GNUNET_SET_operation_cancel (cpi->set_op); | ||
764 | cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, | ||
765 | set_result_cb, &session->info[index]); | ||
766 | if (ntohl (msg->exp_subround) == session->exp_subround) | ||
767 | { | ||
768 | cpi->set_op_concluded = GNUNET_YES; | ||
769 | GNUNET_SET_conclude (cpi->set_op, session->element_set); | ||
770 | } | ||
771 | else | ||
772 | { | ||
773 | cpi->set_op_concluded = GNUNET_NO; | ||
774 | } | ||
775 | break; | ||
776 | default: | ||
777 | GNUNET_break_op (0); | ||
778 | return; | ||
779 | } | ||
754 | } | 780 | } |
755 | 781 | ||
756 | 782 | ||
@@ -769,7 +795,9 @@ initialize_session (struct ConsensusSession *session, | |||
769 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); | 795 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); |
770 | compute_global_id (session, &join_msg->session_id); | 796 | compute_global_id (session, &join_msg->session_id); |
771 | 797 | ||
772 | /* check if some local client already owns the session. */ | 798 | /* check if some local client already owns the session. |
799 | * it is only legal to have a session with an existing global id | ||
800 | * if all other sessions with this global id are finished.*/ | ||
773 | other_session = sessions_head; | 801 | other_session = sessions_head; |
774 | while (NULL != other_session) | 802 | while (NULL != other_session) |
775 | { | 803 | { |
@@ -789,6 +817,8 @@ initialize_session (struct ConsensusSession *session, | |||
789 | 817 | ||
790 | session->local_peer_idx = get_peer_idx (&my_peer, session); | 818 | session->local_peer_idx = get_peer_idx (&my_peer, session); |
791 | GNUNET_assert (-1 != session->local_peer_idx); | 819 | GNUNET_assert (-1 != session->local_peer_idx); |
820 | session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
821 | GNUNET_assert (NULL != session->element_set); | ||
792 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, | 822 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, |
793 | &session->global_id, | 823 | &session->global_id, |
794 | set_listen_cb, session); | 824 | set_listen_cb, session); |
@@ -827,6 +857,8 @@ client_join (void *cls, | |||
827 | { | 857 | { |
828 | struct ConsensusSession *session; | 858 | struct ConsensusSession *session; |
829 | 859 | ||
860 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n"); | ||
861 | |||
830 | session = get_session_by_client (client); | 862 | session = get_session_by_client (client); |
831 | if (NULL != session) | 863 | if (NULL != session) |
832 | { | 864 | { |
@@ -835,9 +867,13 @@ client_join (void *cls, | |||
835 | return; | 867 | return; |
836 | } | 868 | } |
837 | session = GNUNET_new (struct ConsensusSession); | 869 | session = GNUNET_new (struct ConsensusSession); |
870 | session->client = client; | ||
838 | GNUNET_SERVER_client_keep (client); | 871 | GNUNET_SERVER_client_keep (client); |
839 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | 872 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); |
840 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); | 873 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); |
874 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
875 | |||
876 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n"); | ||
841 | } | 877 | } |
842 | 878 | ||
843 | 879 | ||
@@ -858,12 +894,7 @@ client_insert (void *cls, | |||
858 | struct GNUNET_SET_Element *element; | 894 | struct GNUNET_SET_Element *element; |
859 | ssize_t element_size; | 895 | ssize_t element_size; |
860 | 896 | ||
861 | session = sessions_head; | 897 | session = get_session_by_client (client); |
862 | while (NULL != session) | ||
863 | { | ||
864 | if (session->client == client) | ||
865 | break; | ||
866 | } | ||
867 | 898 | ||
868 | if (NULL == session) | 899 | if (NULL == session) |
869 | { | 900 | { |
@@ -886,6 +917,7 @@ client_insert (void *cls, | |||
886 | GNUNET_break (0); | 917 | GNUNET_break (0); |
887 | return; | 918 | return; |
888 | } | 919 | } |
920 | |||
889 | element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); | 921 | element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); |
890 | element->type = msg->element_type; | 922 | element->type = msg->element_type; |
891 | element->size = element_size; | 923 | element->size = element_size; |
@@ -893,6 +925,8 @@ client_insert (void *cls, | |||
893 | element->data = &element[1]; | 925 | element->data = &element[1]; |
894 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); | 926 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); |
895 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 927 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
928 | |||
929 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx); | ||
896 | } | 930 | } |
897 | 931 | ||
898 | 932 | ||
@@ -911,10 +945,10 @@ client_conclude (void *cls, | |||
911 | struct ConsensusSession *session; | 945 | struct ConsensusSession *session; |
912 | struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; | 946 | struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; |
913 | 947 | ||
914 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; | ||
915 | 948 | ||
949 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); | ||
950 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; | ||
916 | session = get_session_by_client (client); | 951 | session = get_session_by_client (client); |
917 | |||
918 | if (NULL == session) | 952 | if (NULL == session) |
919 | { | 953 | { |
920 | /* client not found */ | 954 | /* client not found */ |
@@ -922,14 +956,12 @@ client_conclude (void *cls, | |||
922 | GNUNET_SERVER_client_disconnect (client); | 956 | GNUNET_SERVER_client_disconnect (client); |
923 | return; | 957 | return; |
924 | } | 958 | } |
925 | |||
926 | if (CONSENSUS_ROUND_BEGIN != session->current_round) | 959 | if (CONSENSUS_ROUND_BEGIN != session->current_round) |
927 | { | 960 | { |
928 | /* client requested conclude twice */ | 961 | /* client requested conclude twice */ |
929 | GNUNET_break (0); | 962 | GNUNET_break (0); |
930 | return; | 963 | return; |
931 | } | 964 | } |
932 | |||
933 | if (session->num_peers <= 1) | 965 | if (session->num_peers <= 1) |
934 | { | 966 | { |
935 | //send_client_conclude_done (session); | 967 | //send_client_conclude_done (session); |
@@ -937,7 +969,7 @@ client_conclude (void *cls, | |||
937 | else | 969 | else |
938 | { | 970 | { |
939 | session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); | 971 | session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); |
940 | /* the 'begin' round is over, start with the next, real round */ | 972 | /* the 'begin' round is over, start with the next, actual round */ |
941 | round_over (session, NULL); | 973 | round_over (session, NULL); |
942 | } | 974 | } |
943 | 975 | ||
@@ -964,6 +996,29 @@ shutdown_task (void *cls, | |||
964 | 996 | ||
965 | 997 | ||
966 | /** | 998 | /** |
999 | * Clean up after a client after it is | ||
1000 | * disconnected (either by us or by itself) | ||
1001 | * | ||
1002 | * @param cls closure, unused | ||
1003 | * @param client the client to clean up after | ||
1004 | */ | ||
1005 | void | ||
1006 | handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | ||
1007 | { | ||
1008 | struct ConsensusSession *session; | ||
1009 | |||
1010 | session = get_session_by_client (client); | ||
1011 | if (NULL == session) | ||
1012 | return; | ||
1013 | if ((CONSENSUS_ROUND_BEGIN == session->current_round) || | ||
1014 | (CONSENSUS_ROUND_FINISH == session->current_round)) | ||
1015 | destroy_session (session); | ||
1016 | else | ||
1017 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n"); | ||
1018 | } | ||
1019 | |||
1020 | |||
1021 | /** | ||
967 | * Start processing consensus requests. | 1022 | * Start processing consensus requests. |
968 | * | 1023 | * |
969 | * @param cls closure | 1024 | * @param cls closure |
@@ -971,13 +1026,14 @@ shutdown_task (void *cls, | |||
971 | * @param c configuration to use | 1026 | * @param c configuration to use |
972 | */ | 1027 | */ |
973 | static void | 1028 | static void |
974 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) | 1029 | run (void *cls, struct GNUNET_SERVER_Handle *server, |
1030 | const struct GNUNET_CONFIGURATION_Handle *c) | ||
975 | { | 1031 | { |
976 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | 1032 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { |
977 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | ||
978 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | ||
979 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, | 1033 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, |
980 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, | 1034 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, |
1035 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | ||
1036 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | ||
981 | {NULL, NULL, 0, 0} | 1037 | {NULL, NULL, 0, 0} |
982 | }; | 1038 | }; |
983 | 1039 | ||
@@ -992,6 +1048,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
992 | } | 1048 | } |
993 | GNUNET_SERVER_add_handlers (server, server_handlers); | 1049 | GNUNET_SERVER_add_handlers (server, server_handlers); |
994 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | 1050 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); |
1051 | GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); | ||
995 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | 1052 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); |
996 | } | 1053 | } |
997 | 1054 | ||
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 437636c99..37facf84e 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf | |||
@@ -5,7 +5,7 @@ HOSTNAME = localhost | |||
5 | HOME = $SERVICEHOME | 5 | HOME = $SERVICEHOME |
6 | BINARY = gnunet-service-consensus | 6 | BINARY = gnunet-service-consensus |
7 | #PREFIX = gdbserver :12345 | 7 | #PREFIX = gdbserver :12345 |
8 | PREFIX = valgrind --leak-check=full | 8 | PREFIX = valgrind |
9 | ACCEPT_FROM = 127.0.0.1; | 9 | ACCEPT_FROM = 127.0.0.1; |
10 | ACCEPT_FROM6 = ::1; | 10 | ACCEPT_FROM6 = ::1; |
11 | UNIXPATH = /tmp/gnunet-service-consensus.sock | 11 | UNIXPATH = /tmp/gnunet-service-consensus.sock |
@@ -19,7 +19,11 @@ OPTIONS = -LERROR | |||
19 | 19 | ||
20 | 20 | ||
21 | [arm] | 21 | [arm] |
22 | DEFAULTSERVICES = core consensus | 22 | DEFAULTSERVICES = core consensus set |
23 | |||
24 | [set] | ||
25 | OPTIONS = -L INFO | ||
26 | PREFIX = valgrind | ||
23 | 27 | ||
24 | 28 | ||
25 | [testbed] | 29 | [testbed] |
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index 3b0f0783a..c6ba8eec3 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c | |||
@@ -1404,7 +1404,6 @@ listen_set_union (void *cls, | |||
1404 | neighbor->my_set = GNUNET_SET_create (cfg, | 1404 | neighbor->my_set = GNUNET_SET_create (cfg, |
1405 | GNUNET_SET_OPERATION_UNION); | 1405 | GNUNET_SET_OPERATION_UNION); |
1406 | neighbor->set_op = GNUNET_SET_accept (request, | 1406 | neighbor->set_op = GNUNET_SET_accept (request, |
1407 | neighbor->my_set /* FIXME: pass later! */, | ||
1408 | GNUNET_SET_RESULT_ADDED, | 1407 | GNUNET_SET_RESULT_ADDED, |
1409 | &handle_set_union_result, | 1408 | &handle_set_union_result, |
1410 | neighbor); | 1409 | neighbor); |
@@ -1428,8 +1427,7 @@ initiate_set_union (void *cls, | |||
1428 | neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK; | 1427 | neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK; |
1429 | neighbor->my_set = GNUNET_SET_create (cfg, | 1428 | neighbor->my_set = GNUNET_SET_create (cfg, |
1430 | GNUNET_SET_OPERATION_UNION); | 1429 | GNUNET_SET_OPERATION_UNION); |
1431 | neighbor->set_op = GNUNET_SET_evaluate (neighbor->my_set /* FIXME: pass later! */, | 1430 | neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer, |
1432 | &neighbor->peer, | ||
1433 | &neighbor->real_session_id, | 1431 | &neighbor->real_session_id, |
1434 | NULL, | 1432 | NULL, |
1435 | 0 /* FIXME: salt */, | 1433 | 0 /* FIXME: salt */, |
diff --git a/src/include/gnunet_consensus_service.h b/src/include/gnunet_consensus_service.h index 66d48e0e2..db7509976 100644 --- a/src/include/gnunet_consensus_service.h +++ b/src/include/gnunet_consensus_service.h | |||
@@ -39,28 +39,7 @@ extern "C" | |||
39 | #include "gnunet_common.h" | 39 | #include "gnunet_common.h" |
40 | #include "gnunet_time_lib.h" | 40 | #include "gnunet_time_lib.h" |
41 | #include "gnunet_configuration_lib.h" | 41 | #include "gnunet_configuration_lib.h" |
42 | 42 | #include "gnunet_set_service.h" | |
43 | |||
44 | /** | ||
45 | * An element of the consensus set. | ||
46 | */ | ||
47 | struct GNUNET_CONSENSUS_Element | ||
48 | { | ||
49 | /** | ||
50 | * The actual data of the element. | ||
51 | */ | ||
52 | const void *data; | ||
53 | |||
54 | /** | ||
55 | * Size of the element's data. | ||
56 | */ | ||
57 | uint16_t size; | ||
58 | |||
59 | /** | ||
60 | * Application specific element type | ||
61 | */ | ||
62 | uint16_t type; | ||
63 | }; | ||
64 | 43 | ||
65 | 44 | ||
66 | /** | 45 | /** |
@@ -73,7 +52,7 @@ struct GNUNET_CONSENSUS_Element | |||
73 | * @param element new element, NULL on error | 52 | * @param element new element, NULL on error |
74 | */ | 53 | */ |
75 | typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls, | 54 | typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls, |
76 | const struct GNUNET_CONSENSUS_Element *element); | 55 | const struct GNUNET_SET_Element *element); |
77 | 56 | ||
78 | 57 | ||
79 | 58 | ||
@@ -138,7 +117,7 @@ typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls, | |||
138 | */ | 117 | */ |
139 | void | 118 | void |
140 | GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | 119 | GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, |
141 | const struct GNUNET_CONSENSUS_Element *element, | 120 | const struct GNUNET_SET_Element *element, |
142 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 121 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
143 | void *idc_cls); | 122 | void *idc_cls); |
144 | 123 | ||
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h index 59b692cf0..54ea806a5 100644 --- a/src/include/gnunet_mq_lib.h +++ b/src/include/gnunet_mq_lib.h | |||
@@ -53,36 +53,6 @@ | |||
53 | */ | 53 | */ |
54 | #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) | 54 | #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) |
55 | 55 | ||
56 | /** | ||
57 | * Append data to the end of an existing MQ message. | ||
58 | * If the operation is successful, mqm is changed to point to the new MQ message, | ||
59 | * and GNUNET_OK is returned. | ||
60 | * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed, | ||
61 | * the user of this API must take care of disposing the already allocated message | ||
62 | * (either by sending it, or by using GNUNET_MQ_discard) | ||
63 | * | ||
64 | * @param mqm MQ message to augment with additional data | ||
65 | * @param src source buffer for the additional data | ||
66 | * @param len length of the additional data | ||
67 | * @return GNUNET_SYSERR if nesting the message failed, | ||
68 | * GNUNET_OK on success | ||
69 | */ | ||
70 | #define GNUNET_MQ_nest(mqm, src, len) GNUNET_MQ_nest_ (&mqm, src, len) | ||
71 | |||
72 | |||
73 | /** | ||
74 | * Append a message to the end of an existing MQ message. | ||
75 | * If the operation is successful, mqm is changed to point to the new MQ message, | ||
76 | * and GNUNET_OK is returned. | ||
77 | * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed, | ||
78 | * the user of this API must take care of disposing the already allocated message | ||
79 | * (either by sending it, or by using GNUNET_MQ_discard) | ||
80 | * | ||
81 | * @param mqm MQ message to augment with additional data | ||
82 | * @param mh the message to append, must be of type 'struct GNUNET_MessageHeader *' | ||
83 | */ | ||
84 | #define GNUNET_MQ_nest_mh(mqm, mh) ((NULL == mh) ? (GNUNET_OK) : GNUNET_MQ_nest((mqm), (mh), ntohs ((mh)->size))) | ||
85 | |||
86 | 56 | ||
87 | /** | 57 | /** |
88 | * Allocate a GNUNET_MQ_Message, where the message only consists of a header. | 58 | * Allocate a GNUNET_MQ_Message, where the message only consists of a header. |
@@ -105,6 +75,40 @@ | |||
105 | 75 | ||
106 | 76 | ||
107 | /** | 77 | /** |
78 | * Allocate a GNUNET_MQ_Message, and append a payload message after the given | ||
79 | * message struct. | ||
80 | * | ||
81 | * @param mvar pointer to a message struct, will be changed to point at the newly allocated message, | ||
82 | * whose size is 'sizeof(*mvar) + ntohs (mh->size)' | ||
83 | * @param type message type of the allocated message, has no effect on the nested message | ||
84 | * @param mh message to nest | ||
85 | * @return a newly allocated 'struct GNUNET_MQ_Message *' | ||
86 | */ | ||
87 | #define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh) | ||
88 | |||
89 | |||
90 | /** | ||
91 | * Return a pointer to the message at the end of the given message. | ||
92 | * | ||
93 | * @param var pointer to a message struct, the type of the expression determines the base size, | ||
94 | * the space after the base size is the nested message | ||
95 | * @return a 'struct GNUNET_MessageHeader *' that points at the nested message of the given message, | ||
96 | * or NULL if the given message in 'var' does not have any space after the message struct | ||
97 | */ | ||
98 | #define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var))) | ||
99 | |||
100 | |||
101 | struct GNUNET_MessageHeader * | ||
102 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size); | ||
103 | |||
104 | |||
105 | struct GNUNET_MQ_Message * | ||
106 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, | ||
107 | const struct GNUNET_MessageHeader *nested_mh); | ||
108 | |||
109 | |||
110 | |||
111 | /** | ||
108 | * End-marker for the handlers array | 112 | * End-marker for the handlers array |
109 | */ | 113 | */ |
110 | #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} | 114 | #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} |
@@ -128,7 +132,8 @@ enum GNUNET_MQ_Error | |||
128 | * @param cls closure | 132 | * @param cls closure |
129 | * @param msg the received message | 133 | * @param msg the received message |
130 | */ | 134 | */ |
131 | typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); | 135 | typedef void |
136 | (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); | ||
132 | 137 | ||
133 | 138 | ||
134 | /** | 139 | /** |
@@ -151,10 +156,12 @@ typedef void | |||
151 | * | 156 | * |
152 | * @param cls closure | 157 | * @param cls closure |
153 | */ | 158 | */ |
154 | typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); | 159 | typedef void |
160 | (*GNUNET_MQ_NotifyCallback) (void *cls); | ||
155 | 161 | ||
156 | 162 | ||
157 | typedef void (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); | 163 | typedef void |
164 | (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); | ||
158 | 165 | ||
159 | 166 | ||
160 | struct GNUNET_MQ_Message | 167 | struct GNUNET_MQ_Message |
@@ -287,6 +294,7 @@ struct GNUNET_MQ_Handler | |||
287 | }; | 294 | }; |
288 | 295 | ||
289 | 296 | ||
297 | |||
290 | /** | 298 | /** |
291 | * Create a new message for MQ. | 299 | * Create a new message for MQ. |
292 | * | 300 | * |
@@ -300,21 +308,6 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
300 | 308 | ||
301 | 309 | ||
302 | /** | 310 | /** |
303 | * Resize the the mq message pointed to by mqmp, | ||
304 | * and append the given data to it. | ||
305 | * | ||
306 | * @param mqmp pointer to a mq message pointer | ||
307 | * @param src source of the data to append | ||
308 | * @param len length of the data to append | ||
309 | * @return GNUNET_OK on success, | ||
310 | * GNUNET_SYSERR on error (e.g. if len is too large) | ||
311 | */ | ||
312 | int | ||
313 | GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, | ||
314 | const void *src, uint16_t len); | ||
315 | |||
316 | |||
317 | /** | ||
318 | * Discard the message queue message, free all | 311 | * Discard the message queue message, free all |
319 | * allocated resources. Must be called in the event | 312 | * allocated resources. Must be called in the event |
320 | * that a message is created but should not actually be sent. | 313 | * that a message is created but should not actually be sent. |
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 15bbae2e8..85c643f7d 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -1754,11 +1754,18 @@ extern "C" | |||
1754 | */ | 1754 | */ |
1755 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548 | 1755 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548 |
1756 | 1756 | ||
1757 | /** | ||
1758 | * Abort a round, don't send requested elements anymore | ||
1759 | */ | ||
1760 | #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT 547 | ||
1761 | |||
1757 | 1762 | ||
1758 | /******************************************************************************* | 1763 | /******************************************************************************* |
1759 | * SET message types | 1764 | * SET message types |
1760 | ******************************************************************************/ | 1765 | ******************************************************************************/ |
1761 | 1766 | ||
1767 | #define GNUNET_MESSAGE_TYPE_SET_REJECT 569 | ||
1768 | |||
1762 | /** | 1769 | /** |
1763 | * Cancel a set operation | 1770 | * Cancel a set operation |
1764 | */ | 1771 | */ |
@@ -1800,44 +1807,49 @@ extern "C" | |||
1800 | #define GNUNET_MESSAGE_TYPE_SET_EVALUATE 577 | 1807 | #define GNUNET_MESSAGE_TYPE_SET_EVALUATE 577 |
1801 | 1808 | ||
1802 | /** | 1809 | /** |
1803 | * Evaluate a set operation | 1810 | * Start a set operation with the given set |
1811 | */ | ||
1812 | #define GNUNET_MESSAGE_TYPE_SET_CONCLUDE 578 | ||
1813 | |||
1814 | /** | ||
1815 | * Notify the client of a request from a remote peer | ||
1804 | */ | 1816 | */ |
1805 | #define GNUNET_MESSAGE_TYPE_SET_REQUEST 578 | 1817 | #define GNUNET_MESSAGE_TYPE_SET_REQUEST 579 |
1806 | 1818 | ||
1807 | /** | 1819 | /** |
1808 | * Evaluate a set operation. | 1820 | * Create a new local set |
1809 | */ | 1821 | */ |
1810 | #define GNUNET_MESSAGE_TYPE_SET_CREATE 579 | 1822 | #define GNUNET_MESSAGE_TYPE_SET_CREATE 580 |
1811 | 1823 | ||
1812 | /** | 1824 | /** |
1813 | * Evaluate a set operation. | 1825 | * Request a set operation from a remote peer. |
1814 | */ | 1826 | */ |
1815 | #define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580 | 1827 | #define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 581 |
1816 | 1828 | ||
1817 | /** | 1829 | /** |
1818 | * Strata estimator. | 1830 | * Strata estimator. |
1819 | */ | 1831 | */ |
1820 | #define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581 | 1832 | #define GNUNET_MESSAGE_TYPE_SET_P2P_SE 582 |
1821 | 1833 | ||
1822 | /** | 1834 | /** |
1823 | * Invertible bloom filter. | 1835 | * Invertible bloom filter. |
1824 | */ | 1836 | */ |
1825 | #define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582 | 1837 | #define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 583 |
1826 | 1838 | ||
1827 | /** | 1839 | /** |
1828 | * Actual set elements. | 1840 | * Actual set elements. |
1829 | */ | 1841 | */ |
1830 | #define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583 | 1842 | #define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 584 |
1831 | 1843 | ||
1832 | /** | 1844 | /** |
1833 | * Requests for the elements with the given hashes. | 1845 | * Requests for the elements with the given hashes. |
1834 | */ | 1846 | */ |
1835 | #define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584 | 1847 | #define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 585 |
1836 | 1848 | ||
1837 | /** | 1849 | /** |
1838 | * Operation is done. | 1850 | * Operation is done. |
1839 | */ | 1851 | */ |
1840 | #define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585 | 1852 | #define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 586 |
1841 | 1853 | ||
1842 | 1854 | ||
1843 | 1855 | ||
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index ce413c0de..2684df00a 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h | |||
@@ -257,11 +257,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); | |||
257 | 257 | ||
258 | 258 | ||
259 | /** | 259 | /** |
260 | * Evaluate a set operation with our set and the set of another peer. | 260 | * Create a set operation for evaluation with another peer. |
261 | * The evaluation will not start until the client provides | ||
262 | * a local set with GNUNET_SET_conclude. | ||
261 | * | 263 | * |
262 | * @param set set to use -- FIXME: remove | ||
263 | * this argument, use GNUNET_SET_conclude instead! | ||
264 | * @param salt salt for HKDF (explain more here) | ||
265 | * @param other_peer peer with the other set | 264 | * @param other_peer peer with the other set |
266 | * @param app_id hash for the application using the set | 265 | * @param app_id hash for the application using the set |
267 | * @param context_msg additional information for the request | 266 | * @param context_msg additional information for the request |
@@ -275,8 +274,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); | |||
275 | * @return a handle to cancel the operation | 274 | * @return a handle to cancel the operation |
276 | */ | 275 | */ |
277 | struct GNUNET_SET_OperationHandle * | 276 | struct GNUNET_SET_OperationHandle * |
278 | GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | 277 | GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, |
279 | const struct GNUNET_PeerIdentity *other_peer, | ||
280 | const struct GNUNET_HashCode *app_id, | 278 | const struct GNUNET_HashCode *app_id, |
281 | const struct GNUNET_MessageHeader *context_msg, | 279 | const struct GNUNET_MessageHeader *context_msg, |
282 | uint16_t salt, | 280 | uint16_t salt, |
@@ -315,13 +313,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh); | |||
315 | 313 | ||
316 | 314 | ||
317 | /** | 315 | /** |
318 | * Accept a request we got via GNUNET_SET_listen. Must be called | 316 | * Accept a request we got via GNUNET_SET_listen. Must be called during |
319 | * during GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' | 317 | * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid |
320 | * becomes invalid afterwards. | 318 | * afterwards. |
319 | * Call GNUNET_SET_conclude to provide the local set to use for the operation, | ||
320 | * and to begin the exchange with the remote peer. | ||
321 | * | 321 | * |
322 | * @param request request to accept | 322 | * @param request request to accept |
323 | * @param set set used for the requested operation -- FIXME: remove | ||
324 | * this argument, use GNUNET_SET_conclude instead! | ||
325 | * @param result_mode specified how results will be returned, | 323 | * @param result_mode specified how results will be returned, |
326 | * see 'GNUNET_SET_ResultMode'. | 324 | * see 'GNUNET_SET_ResultMode'. |
327 | * @param result_cb callback for the results | 325 | * @param result_cb callback for the results |
@@ -330,7 +328,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh); | |||
330 | */ | 328 | */ |
331 | struct GNUNET_SET_OperationHandle * | 329 | struct GNUNET_SET_OperationHandle * |
332 | GNUNET_SET_accept (struct GNUNET_SET_Request *request, | 330 | GNUNET_SET_accept (struct GNUNET_SET_Request *request, |
333 | struct GNUNET_SET_Handle *set, | ||
334 | enum GNUNET_SET_ResultMode result_mode, | 331 | enum GNUNET_SET_ResultMode result_mode, |
335 | GNUNET_SET_ResultIterator result_cb, | 332 | GNUNET_SET_ResultIterator result_cb, |
336 | void *cls); | 333 | void *cls); |
@@ -353,9 +350,9 @@ GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, | |||
353 | 350 | ||
354 | 351 | ||
355 | /** | 352 | /** |
356 | * Cancel the given set operation. FIXME: do clients have | 353 | * Cancel the given set operation. |
357 | * to cancel the operatino if the GNUNET_SET_ResultIterator | 354 | * May not be called after the operation's GNUNET_SET_ResultIterator has been |
358 | * has been called with timeout/error/done? | 355 | * called with a status that indicates error, timeout or done. |
359 | * | 356 | * |
360 | * @param oh set operation to cancel | 357 | * @param oh set operation to cancel |
361 | */ | 358 | */ |
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 2aea50365..4da718879 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -226,6 +226,23 @@ destroy_incoming (struct Incoming *incoming) | |||
226 | GNUNET_free (incoming); | 226 | GNUNET_free (incoming); |
227 | } | 227 | } |
228 | 228 | ||
229 | static struct Listener * | ||
230 | get_listener_by_target (enum GNUNET_SET_OperationType op, | ||
231 | const struct GNUNET_HashCode *app_id) | ||
232 | { | ||
233 | struct Listener *l; | ||
234 | |||
235 | for (l = listeners_head; NULL != l; l = l->next) | ||
236 | { | ||
237 | if (l->operation != op) | ||
238 | continue; | ||
239 | if (0 != GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id)) | ||
240 | continue; | ||
241 | return l; | ||
242 | } | ||
243 | return NULL; | ||
244 | } | ||
245 | |||
229 | 246 | ||
230 | /** | 247 | /** |
231 | * Handle a request for a set operation from | 248 | * Handle a request for a set operation from |
@@ -240,62 +257,33 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
240 | struct Incoming *incoming = cls; | 257 | struct Incoming *incoming = cls; |
241 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; | 258 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; |
242 | struct GNUNET_MQ_Message *mqm; | 259 | struct GNUNET_MQ_Message *mqm; |
243 | struct RequestMessage *cmsg; | 260 | struct GNUNET_SET_RequestMessage *cmsg; |
244 | struct Listener *listener; | 261 | struct Listener *listener; |
245 | const struct GNUNET_MessageHeader *context_msg; | 262 | const struct GNUNET_MessageHeader *context_msg; |
246 | 263 | ||
247 | if (ntohs (mh->size) < sizeof *msg) | 264 | context_msg = GNUNET_MQ_extract_nested_mh (msg); |
265 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", | ||
266 | ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); | ||
267 | listener = get_listener_by_target (ntohs (msg->operation), &msg->app_id); | ||
268 | if (NULL == listener) | ||
248 | { | 269 | { |
249 | /* message is to small for its type */ | 270 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
250 | GNUNET_break (0); | 271 | "set operation request from peer failed: " |
251 | destroy_incoming (incoming); | 272 | "no set with matching application ID and operation type\n"); |
252 | return; | 273 | return; |
253 | } | 274 | } |
254 | else if (ntohs (mh->size) == sizeof *msg) | 275 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg); |
255 | { | 276 | if (NULL == mqm) |
256 | /* there is no context message */ | ||
257 | context_msg = NULL; | ||
258 | } | ||
259 | else | ||
260 | { | 277 | { |
261 | context_msg = &msg[1].header; | 278 | /* FIXME: disconnect the peer */ |
262 | if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size)) | 279 | GNUNET_break_op (0); |
263 | { | ||
264 | /* size of context message is invalid */ | ||
265 | GNUNET_break (0); | ||
266 | destroy_incoming (incoming); | ||
267 | return; | ||
268 | } | ||
269 | } | ||
270 | |||
271 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", | ||
272 | ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); | ||
273 | |||
274 | /* find the appropriate listener */ | ||
275 | for (listener = listeners_head; | ||
276 | listener != NULL; | ||
277 | listener = listener->next) | ||
278 | { | ||
279 | if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) || | ||
280 | (ntohs (msg->operation) != listener->operation) ) | ||
281 | continue; | ||
282 | mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST); | ||
283 | if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg)) | ||
284 | { | ||
285 | /* FIXME: disconnect the peer */ | ||
286 | GNUNET_MQ_discard (mqm); | ||
287 | GNUNET_break (0); | ||
288 | return; | ||
289 | } | ||
290 | incoming->accept_id = accept_id++; | ||
291 | cmsg->accept_id = htonl (incoming->accept_id); | ||
292 | GNUNET_MQ_send (listener->client_mq, mqm); | ||
293 | return; | 280 | return; |
294 | } | 281 | } |
295 | 282 | incoming->accept_id = accept_id++; | |
296 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 283 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id); |
297 | "set operation request from peer failed: " | 284 | cmsg->accept_id = htonl (incoming->accept_id); |
298 | "no set with matching application ID and operation type\n"); | 285 | cmsg->peer_id = incoming->peer; |
286 | GNUNET_MQ_send (listener->client_mq, mqm); | ||
299 | } | 287 | } |
300 | 288 | ||
301 | 289 | ||
@@ -311,7 +299,7 @@ handle_client_create (void *cls, | |||
311 | struct GNUNET_SERVER_Client *client, | 299 | struct GNUNET_SERVER_Client *client, |
312 | const struct GNUNET_MessageHeader *m) | 300 | const struct GNUNET_MessageHeader *m) |
313 | { | 301 | { |
314 | struct SetCreateMessage *msg = (struct SetCreateMessage *) m; | 302 | struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m; |
315 | struct Set *set; | 303 | struct Set *set; |
316 | 304 | ||
317 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", | 305 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", |
@@ -363,7 +351,7 @@ handle_client_listen (void *cls, | |||
363 | struct GNUNET_SERVER_Client *client, | 351 | struct GNUNET_SERVER_Client *client, |
364 | const struct GNUNET_MessageHeader *m) | 352 | const struct GNUNET_MessageHeader *m) |
365 | { | 353 | { |
366 | struct ListenMessage *msg = (struct ListenMessage *) m; | 354 | struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m; |
367 | struct Listener *listener; | 355 | struct Listener *listener; |
368 | 356 | ||
369 | if (NULL != get_listener (client)) | 357 | if (NULL != get_listener (client)) |
@@ -410,7 +398,7 @@ handle_client_remove (void *cls, | |||
410 | switch (set->operation) | 398 | switch (set->operation) |
411 | { | 399 | { |
412 | case GNUNET_SET_OPERATION_UNION: | 400 | case GNUNET_SET_OPERATION_UNION: |
413 | _GSS_union_remove ((struct ElementMessage *) m, set); | 401 | _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set); |
414 | case GNUNET_SET_OPERATION_INTERSECTION: | 402 | case GNUNET_SET_OPERATION_INTERSECTION: |
415 | /* FIXME: cfuchs */ | 403 | /* FIXME: cfuchs */ |
416 | break; | 404 | break; |
@@ -423,6 +411,38 @@ handle_client_remove (void *cls, | |||
423 | } | 411 | } |
424 | 412 | ||
425 | 413 | ||
414 | |||
415 | /** | ||
416 | * Called when the client wants to reject an operation | ||
417 | * request from another peer. | ||
418 | * | ||
419 | * @param cls unused | ||
420 | * @param client client that sent the message | ||
421 | * @param m message sent by the client | ||
422 | */ | ||
423 | static void | ||
424 | handle_client_reject (void *cls, | ||
425 | struct GNUNET_SERVER_Client *client, | ||
426 | const struct GNUNET_MessageHeader *m) | ||
427 | { | ||
428 | struct Incoming *incoming; | ||
429 | struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) m; | ||
430 | |||
431 | GNUNET_break (0 == ntohl (msg->request_id)); | ||
432 | |||
433 | incoming = get_incoming (ntohl (msg->accept_reject_id)); | ||
434 | if (NULL == incoming) | ||
435 | { | ||
436 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
437 | return; | ||
438 | } | ||
439 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); | ||
440 | destroy_incoming (incoming); | ||
441 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
442 | } | ||
443 | |||
444 | |||
445 | |||
426 | /** | 446 | /** |
427 | * Called when a client wants to add an element to a | 447 | * Called when a client wants to add an element to a |
428 | * set it inhabits. | 448 | * set it inhabits. |
@@ -448,7 +468,7 @@ handle_client_add (void *cls, | |||
448 | switch (set->operation) | 468 | switch (set->operation) |
449 | { | 469 | { |
450 | case GNUNET_SET_OPERATION_UNION: | 470 | case GNUNET_SET_OPERATION_UNION: |
451 | _GSS_union_add ((struct ElementMessage *) m, set); | 471 | _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set); |
452 | case GNUNET_SET_OPERATION_INTERSECTION: | 472 | case GNUNET_SET_OPERATION_INTERSECTION: |
453 | /* FIXME: cfuchs */ | 473 | /* FIXME: cfuchs */ |
454 | break; | 474 | break; |
@@ -490,7 +510,7 @@ handle_client_evaluate (void *cls, | |||
490 | /* FIXME: cfuchs */ | 510 | /* FIXME: cfuchs */ |
491 | break; | 511 | break; |
492 | case GNUNET_SET_OPERATION_UNION: | 512 | case GNUNET_SET_OPERATION_UNION: |
493 | _GSS_union_evaluate ((struct EvaluateMessage *) m, set); | 513 | _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); |
494 | break; | 514 | break; |
495 | default: | 515 | default: |
496 | GNUNET_assert (0); | 516 | GNUNET_assert (0); |
@@ -502,23 +522,6 @@ handle_client_evaluate (void *cls, | |||
502 | 522 | ||
503 | 523 | ||
504 | /** | 524 | /** |
505 | * Handle a cancel request from a client. | ||
506 | * | ||
507 | * @param cls unused | ||
508 | * @param client the client | ||
509 | * @param m the cancel message | ||
510 | */ | ||
511 | static void | ||
512 | handle_client_cancel (void *cls, | ||
513 | struct GNUNET_SERVER_Client *client, | ||
514 | const struct GNUNET_MessageHeader *m) | ||
515 | { | ||
516 | /* FIXME: implement */ | ||
517 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
518 | } | ||
519 | |||
520 | |||
521 | /** | ||
522 | * Handle an ack from a client. | 525 | * Handle an ack from a client. |
523 | * | 526 | * |
524 | * @param cls unused | 527 | * @param cls unused |
@@ -550,25 +553,20 @@ handle_client_accept (void *cls, | |||
550 | { | 553 | { |
551 | struct Set *set; | 554 | struct Set *set; |
552 | struct Incoming *incoming; | 555 | struct Incoming *incoming; |
553 | struct AcceptMessage *msg = (struct AcceptMessage *) mh; | 556 | struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; |
554 | 557 | ||
558 | incoming = get_incoming (ntohl (msg->accept_reject_id)); | ||
555 | 559 | ||
556 | incoming = get_incoming (ntohl (msg->accept_id)); | 560 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl (msg->accept_reject_id)); |
557 | 561 | ||
558 | if (NULL == incoming) | 562 | if (NULL == incoming) |
559 | { | 563 | { |
564 | |||
560 | GNUNET_break (0); | 565 | GNUNET_break (0); |
561 | GNUNET_SERVER_client_disconnect (client); | 566 | GNUNET_SERVER_client_disconnect (client); |
562 | return; | 567 | return; |
563 | } | 568 | } |
564 | 569 | ||
565 | if (0 == ntohl (msg->request_id)) | ||
566 | { | ||
567 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); | ||
568 | destroy_incoming (incoming); | ||
569 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
570 | return; | ||
571 | } | ||
572 | 570 | ||
573 | set = get_set (client); | 571 | set = get_set (client); |
574 | 572 | ||
@@ -687,14 +685,14 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
687 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 685 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
688 | { | 686 | { |
689 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | 687 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { |
688 | {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0}, | ||
689 | {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0}, | ||
690 | {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, | ||
690 | {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, | 691 | {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, |
692 | {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, | ||
691 | {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0}, | 693 | {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0}, |
692 | {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, | 694 | {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0}, |
693 | {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, | 695 | {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, |
694 | {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0}, | ||
695 | {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, | ||
696 | {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0}, | ||
697 | {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0}, | ||
698 | {NULL, NULL, 0, 0} | 696 | {NULL, NULL, 0, 0} |
699 | }; | 697 | }; |
700 | 698 | ||
@@ -705,6 +703,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
705 | stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, | 703 | stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, |
706 | &stream_listen_cb, NULL, | 704 | &stream_listen_cb, NULL, |
707 | GNUNET_STREAM_OPTION_END); | 705 | GNUNET_STREAM_OPTION_END); |
706 | |||
707 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n"); | ||
708 | } | 708 | } |
709 | 709 | ||
710 | 710 | ||
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index bea77416e..15199eba4 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h | |||
@@ -217,7 +217,7 @@ _GSS_union_set_create (void); | |||
217 | * @parem set the set to evaluate the operation with | 217 | * @parem set the set to evaluate the operation with |
218 | */ | 218 | */ |
219 | void | 219 | void |
220 | _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set); | 220 | _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set); |
221 | 221 | ||
222 | 222 | ||
223 | /** | 223 | /** |
@@ -227,7 +227,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set); | |||
227 | * @param set set to add the element to | 227 | * @param set set to add the element to |
228 | */ | 228 | */ |
229 | void | 229 | void |
230 | _GSS_union_add (struct ElementMessage *m, struct Set *set); | 230 | _GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set); |
231 | 231 | ||
232 | 232 | ||
233 | /** | 233 | /** |
@@ -238,7 +238,7 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set); | |||
238 | * @param set set to remove the element from | 238 | * @param set set to remove the element from |
239 | */ | 239 | */ |
240 | void | 240 | void |
241 | _GSS_union_remove (struct ElementMessage *m, struct Set *set); | 241 | _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set); |
242 | 242 | ||
243 | 243 | ||
244 | /** | 244 | /** |
@@ -258,7 +258,7 @@ _GSS_union_set_destroy (struct Set *set); | |||
258 | * @param incoming information about the requesting remote peer | 258 | * @param incoming information about the requesting remote peer |
259 | */ | 259 | */ |
260 | void | 260 | void |
261 | _GSS_union_accept (struct AcceptMessage *m, struct Set *set, | 261 | _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, |
262 | struct Incoming *incoming); | 262 | struct Incoming *incoming); |
263 | 263 | ||
264 | 264 | ||
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index c651a0381..6d9658ee5 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -245,8 +245,7 @@ struct ElementEntry | |||
245 | 245 | ||
246 | 246 | ||
247 | /** | 247 | /** |
248 | * Information about the element used for | 248 | * Entries in the key-to-element map of the union set. |
249 | * a specific union operation. | ||
250 | */ | 249 | */ |
251 | struct KeyEntry | 250 | struct KeyEntry |
252 | { | 251 | { |
@@ -401,11 +400,14 @@ destroy_key_to_element_iter (void *cls, | |||
401 | static void | 400 | static void |
402 | destroy_union_operation (struct UnionEvaluateOperation *eo) | 401 | destroy_union_operation (struct UnionEvaluateOperation *eo) |
403 | { | 402 | { |
403 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); | ||
404 | |||
404 | if (NULL != eo->mq) | 405 | if (NULL != eo->mq) |
405 | { | 406 | { |
406 | GNUNET_MQ_destroy (eo->mq); | 407 | GNUNET_MQ_destroy (eo->mq); |
407 | eo->mq = NULL; | 408 | eo->mq = NULL; |
408 | } | 409 | } |
410 | |||
409 | if (NULL != eo->socket) | 411 | if (NULL != eo->socket) |
410 | { | 412 | { |
411 | GNUNET_STREAM_close (eo->socket); | 413 | GNUNET_STREAM_close (eo->socket); |
@@ -433,12 +435,16 @@ destroy_union_operation (struct UnionEvaluateOperation *eo) | |||
433 | eo->key_to_element = NULL; | 435 | eo->key_to_element = NULL; |
434 | } | 436 | } |
435 | 437 | ||
436 | |||
437 | GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, | 438 | GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, |
438 | eo->set->state.u->ops_tail, | 439 | eo->set->state.u->ops_tail, |
439 | eo); | 440 | eo); |
440 | GNUNET_free (eo); | 441 | GNUNET_free (eo); |
441 | /* FIXME: free and destroy everything else */ | 442 | |
443 | |||
444 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); | ||
445 | |||
446 | |||
447 | /* FIXME: do a garbage collection of the set generations */ | ||
442 | } | 448 | } |
443 | 449 | ||
444 | 450 | ||
@@ -452,7 +458,7 @@ static void | |||
452 | fail_union_operation (struct UnionEvaluateOperation *eo) | 458 | fail_union_operation (struct UnionEvaluateOperation *eo) |
453 | { | 459 | { |
454 | struct GNUNET_MQ_Message *mqm; | 460 | struct GNUNET_MQ_Message *mqm; |
455 | struct ResultMessage *msg; | 461 | struct GNUNET_SET_ResultMessage *msg; |
456 | 462 | ||
457 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 463 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
458 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 464 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
@@ -495,20 +501,25 @@ send_operation_request (struct UnionEvaluateOperation *eo) | |||
495 | struct GNUNET_MQ_Message *mqm; | 501 | struct GNUNET_MQ_Message *mqm; |
496 | struct OperationRequestMessage *msg; | 502 | struct OperationRequestMessage *msg; |
497 | 503 | ||
498 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); | 504 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); |
499 | if (NULL != eo->context_msg) | 505 | |
500 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) | 506 | if (NULL == mqm) |
501 | { | 507 | { |
502 | /* the context message is too large */ | 508 | /* the context message is too large */ |
503 | GNUNET_break (0); | 509 | GNUNET_break (0); |
504 | GNUNET_SERVER_client_disconnect (eo->set->client); | 510 | GNUNET_SERVER_client_disconnect (eo->set->client); |
505 | GNUNET_MQ_discard (mqm); | 511 | return; |
506 | return; | 512 | } |
507 | } | ||
508 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); | 513 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); |
509 | msg->app_id = eo->app_id; | 514 | msg->app_id = eo->app_id; |
510 | GNUNET_MQ_send (eo->mq, mqm); | 515 | GNUNET_MQ_send (eo->mq, mqm); |
511 | 516 | ||
517 | if (NULL != eo->context_msg) | ||
518 | { | ||
519 | GNUNET_free (eo->context_msg); | ||
520 | eo->context_msg = NULL; | ||
521 | } | ||
522 | |||
512 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); | 523 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); |
513 | } | 524 | } |
514 | 525 | ||
@@ -537,7 +548,7 @@ insert_element_iterator (void *cls, | |||
537 | { | 548 | { |
538 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) | 549 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) |
539 | { | 550 | { |
540 | new_k->next_colliding = old_k; | 551 | new_k->next_colliding = old_k->next_colliding; |
541 | old_k->next_colliding = new_k; | 552 | old_k->next_colliding = new_k; |
542 | return GNUNET_NO; | 553 | return GNUNET_NO; |
543 | } | 554 | } |
@@ -568,12 +579,11 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) | |||
568 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, | 579 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, |
569 | (uint32_t) ibf_key.key_val, | 580 | (uint32_t) ibf_key.key_val, |
570 | insert_element_iterator, k); | 581 | insert_element_iterator, k); |
582 | |||
571 | /* was the element inserted into a colliding bucket? */ | 583 | /* was the element inserted into a colliding bucket? */ |
572 | if (GNUNET_SYSERR == ret) | 584 | if (GNUNET_SYSERR == ret) |
573 | { | ||
574 | GNUNET_assert (NULL != k->next_colliding); | ||
575 | return; | 585 | return; |
576 | } | 586 | |
577 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, | 587 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, |
578 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 588 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); |
579 | } | 589 | } |
@@ -781,8 +791,8 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | |||
781 | */ | 791 | */ |
782 | static int | 792 | static int |
783 | send_element_iterator (void *cls, | 793 | send_element_iterator (void *cls, |
784 | uint32_t key, | 794 | uint32_t key, |
785 | void *value) | 795 | void *value) |
786 | { | 796 | { |
787 | struct SendElementClosure *sec = cls; | 797 | struct SendElementClosure *sec = cls; |
788 | struct IBF_Key ibf_key = sec->ibf_key; | 798 | struct IBF_Key ibf_key = sec->ibf_key; |
@@ -795,15 +805,18 @@ send_element_iterator (void *cls, | |||
795 | { | 805 | { |
796 | const struct GNUNET_SET_Element *const element = &ke->element->element; | 806 | const struct GNUNET_SET_Element *const element = &ke->element->element; |
797 | struct GNUNET_MQ_Message *mqm; | 807 | struct GNUNET_MQ_Message *mqm; |
808 | struct GNUNET_MessageHeader *mh; | ||
798 | 809 | ||
799 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); | 810 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); |
800 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | 811 | mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); |
801 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) | 812 | if (NULL == mqm) |
802 | { | 813 | { |
814 | /* element too large */ | ||
803 | GNUNET_break (0); | 815 | GNUNET_break (0); |
804 | GNUNET_MQ_discard (mqm); | ||
805 | continue; | 816 | continue; |
806 | } | 817 | } |
818 | memcpy (&mh[1], element->data, element->size); | ||
819 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); | ||
807 | GNUNET_MQ_send (eo->mq, mqm); | 820 | GNUNET_MQ_send (eo->mq, mqm); |
808 | ke = ke->next_colliding; | 821 | ke = ke->next_colliding; |
809 | } | 822 | } |
@@ -975,34 +988,42 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
975 | struct GNUNET_SET_Element *element) | 988 | struct GNUNET_SET_Element *element) |
976 | { | 989 | { |
977 | struct GNUNET_MQ_Message *mqm; | 990 | struct GNUNET_MQ_Message *mqm; |
978 | struct ResultMessage *rm; | 991 | struct GNUNET_SET_ResultMessage *rm; |
979 | 992 | ||
980 | GNUNET_assert (0 != eo->request_id); | 993 | GNUNET_assert (0 != eo->request_id); |
981 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 994 | mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
982 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 995 | if (NULL == mqm) |
983 | rm->request_id = htonl (eo->request_id); | ||
984 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) | ||
985 | { | 996 | { |
986 | GNUNET_MQ_discard (mqm); | 997 | GNUNET_MQ_discard (mqm); |
987 | GNUNET_break (0); | 998 | GNUNET_break (0); |
988 | return; | 999 | return; |
989 | } | 1000 | } |
990 | 1001 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | |
1002 | rm->request_id = htonl (eo->request_id); | ||
1003 | memcpy (&rm[1], element->data, element->size); | ||
991 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 1004 | GNUNET_MQ_send (eo->set->client_mq, mqm); |
992 | } | 1005 | } |
993 | 1006 | ||
994 | 1007 | ||
995 | /** | 1008 | /** |
996 | * Callback used for notifications | 1009 | * Completion callback for shutdown |
997 | * | 1010 | * |
998 | * @param cls closure | 1011 | * @param cls the closure from GNUNET_STREAM_shutdown call |
1012 | * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, | ||
1013 | * SHUT_RDWR) | ||
999 | */ | 1014 | */ |
1000 | static void | 1015 | /* |
1001 | client_done_sent_cb (void *cls) | 1016 | static void |
1017 | stream_shutdown_cb (void *cls, | ||
1018 | int operation) | ||
1002 | { | 1019 | { |
1003 | //struct UnionEvaluateOperation *eo = cls; | 1020 | //struct UnionEvaluateOperation *eo = cls; |
1004 | /* FIXME: destroy eo */ | 1021 | |
1022 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n"); | ||
1023 | |||
1024 | // destroy_union_operation (eo); | ||
1005 | } | 1025 | } |
1026 | */ | ||
1006 | 1027 | ||
1007 | 1028 | ||
1008 | /** | 1029 | /** |
@@ -1018,16 +1039,15 @@ static void | |||
1018 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | 1039 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) |
1019 | { | 1040 | { |
1020 | struct GNUNET_MQ_Message *mqm; | 1041 | struct GNUNET_MQ_Message *mqm; |
1021 | struct ResultMessage *rm; | 1042 | struct GNUNET_SET_ResultMessage *rm; |
1022 | 1043 | ||
1023 | GNUNET_assert (0 != eo->request_id); | 1044 | GNUNET_assert (0 != eo->request_id); |
1024 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 1045 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
1025 | rm->request_id = htonl (eo->request_id); | 1046 | rm->request_id = htonl (eo->request_id); |
1026 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 1047 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1027 | GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo); | ||
1028 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 1048 | GNUNET_MQ_send (eo->set->client_mq, mqm); |
1029 | 1049 | ||
1030 | /* FIXME: destroy the eo */ | 1050 | // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo); |
1031 | } | 1051 | } |
1032 | 1052 | ||
1033 | 1053 | ||
@@ -1199,18 +1219,25 @@ stream_open_cb (void *cls, | |||
1199 | * @parem set the set to evaluate the operation with | 1219 | * @parem set the set to evaluate the operation with |
1200 | */ | 1220 | */ |
1201 | void | 1221 | void |
1202 | _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) | 1222 | _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) |
1203 | { | 1223 | { |
1204 | struct UnionEvaluateOperation *eo; | 1224 | struct UnionEvaluateOperation *eo; |
1225 | struct GNUNET_MessageHeader *context_msg; | ||
1205 | 1226 | ||
1206 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1227 | eo = GNUNET_new (struct UnionEvaluateOperation); |
1207 | eo->peer = m->peer; | 1228 | eo->peer = m->target_peer; |
1208 | eo->set = set; | 1229 | eo->set = set; |
1209 | eo->request_id = htonl (m->request_id); | 1230 | eo->request_id = htonl (m->request_id); |
1210 | GNUNET_assert (0 != eo->request_id); | 1231 | GNUNET_assert (0 != eo->request_id); |
1211 | eo->se = strata_estimator_dup (set->state.u->se); | 1232 | eo->se = strata_estimator_dup (set->state.u->se); |
1212 | eo->salt = ntohs (m->salt); | 1233 | eo->salt = ntohs (m->salt); |
1213 | eo->app_id = m->app_id; | 1234 | eo->app_id = m->app_id; |
1235 | |||
1236 | context_msg = GNUNET_MQ_extract_nested_mh (m); | ||
1237 | if (NULL != context_msg) | ||
1238 | { | ||
1239 | eo->context_msg = GNUNET_copy_message (context_msg); | ||
1240 | } | ||
1214 | 1241 | ||
1215 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1242 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
1216 | "evaluating union operation, (app %s)\n", | 1243 | "evaluating union operation, (app %s)\n", |
@@ -1235,7 +1262,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) | |||
1235 | * @param incoming information about the requesting remote peer | 1262 | * @param incoming information about the requesting remote peer |
1236 | */ | 1263 | */ |
1237 | void | 1264 | void |
1238 | _GSS_union_accept (struct AcceptMessage *m, struct Set *set, | 1265 | _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, |
1239 | struct Incoming *incoming) | 1266 | struct Incoming *incoming) |
1240 | { | 1267 | { |
1241 | struct UnionEvaluateOperation *eo; | 1268 | struct UnionEvaluateOperation *eo; |
@@ -1250,7 +1277,6 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set, | |||
1250 | GNUNET_assert (0 != ntohl (m->request_id)); | 1277 | GNUNET_assert (0 != ntohl (m->request_id)); |
1251 | eo->request_id = ntohl (m->request_id); | 1278 | eo->request_id = ntohl (m->request_id); |
1252 | eo->se = strata_estimator_dup (set->state.u->se); | 1279 | eo->se = strata_estimator_dup (set->state.u->se); |
1253 | eo->set = set; // FIXME: redundant!? | ||
1254 | eo->mq = incoming->mq; | 1280 | eo->mq = incoming->mq; |
1255 | /* transfer ownership of mq and socket from incoming to eo */ | 1281 | /* transfer ownership of mq and socket from incoming to eo */ |
1256 | incoming->mq = NULL; | 1282 | incoming->mq = NULL; |
@@ -1299,7 +1325,7 @@ _GSS_union_set_create (void) | |||
1299 | * @param set set to add the element to | 1325 | * @param set set to add the element to |
1300 | */ | 1326 | */ |
1301 | void | 1327 | void |
1302 | _GSS_union_add (struct ElementMessage *m, struct Set *set) | 1328 | _GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set) |
1303 | { | 1329 | { |
1304 | struct ElementEntry *ee; | 1330 | struct ElementEntry *ee; |
1305 | struct ElementEntry *ee_dup; | 1331 | struct ElementEntry *ee_dup; |
@@ -1357,7 +1383,9 @@ _GSS_union_set_destroy (struct Set *set) | |||
1357 | destroy_elements (set->state.u); | 1383 | destroy_elements (set->state.u); |
1358 | 1384 | ||
1359 | while (NULL != set->state.u->ops_head) | 1385 | while (NULL != set->state.u->ops_head) |
1386 | { | ||
1360 | destroy_union_operation (set->state.u->ops_head); | 1387 | destroy_union_operation (set->state.u->ops_head); |
1388 | } | ||
1361 | } | 1389 | } |
1362 | 1390 | ||
1363 | /** | 1391 | /** |
@@ -1368,7 +1396,7 @@ _GSS_union_set_destroy (struct Set *set) | |||
1368 | * @param set set to remove the element from | 1396 | * @param set set to remove the element from |
1369 | */ | 1397 | */ |
1370 | void | 1398 | void |
1371 | _GSS_union_remove (struct ElementMessage *m, struct Set *set) | 1399 | _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) |
1372 | { | 1400 | { |
1373 | struct GNUNET_HashCode hash; | 1401 | struct GNUNET_HashCode hash; |
1374 | struct ElementEntry *ee; | 1402 | struct ElementEntry *ee; |
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c index 5f2d1c976..ae84610fc 100644 --- a/src/set/gnunet-set.c +++ b/src/set/gnunet-set.c | |||
@@ -91,11 +91,12 @@ listen_cb (void *cls, | |||
91 | const struct GNUNET_MessageHeader *context_msg, | 91 | const struct GNUNET_MessageHeader *context_msg, |
92 | struct GNUNET_SET_Request *request) | 92 | struct GNUNET_SET_Request *request) |
93 | { | 93 | { |
94 | struct GNUNET_SET_OperationHandle *oh; | ||
94 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); | 95 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); |
95 | GNUNET_SET_listen_cancel (listen_handle); | 96 | GNUNET_SET_listen_cancel (listen_handle); |
96 | 97 | ||
97 | GNUNET_SET_accept (request, set2, | 98 | oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); |
98 | GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); | 99 | GNUNET_SET_conclude (oh, set2); |
99 | } | 100 | } |
100 | 101 | ||
101 | 102 | ||
@@ -107,11 +108,14 @@ listen_cb (void *cls, | |||
107 | static void | 108 | static void |
108 | start (void *cls) | 109 | start (void *cls) |
109 | { | 110 | { |
111 | struct GNUNET_SET_OperationHandle *oh; | ||
112 | |||
110 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | 113 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, |
111 | &app_id, listen_cb, NULL); | 114 | &app_id, listen_cb, NULL); |
112 | GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, | 115 | oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, |
113 | GNUNET_SET_RESULT_ADDED, | 116 | GNUNET_SET_RESULT_ADDED, |
114 | result_cb_set1, NULL); | 117 | result_cb_set1, NULL); |
118 | GNUNET_SET_conclude (oh, set1); | ||
115 | } | 119 | } |
116 | 120 | ||
117 | 121 | ||
diff --git a/src/set/set.h b/src/set/set.h index ad2200de9..7ec3e6cb2 100644 --- a/src/set/set.h +++ b/src/set/set.h | |||
@@ -29,17 +29,12 @@ | |||
29 | #include "platform.h" | 29 | #include "platform.h" |
30 | #include "gnunet_common.h" | 30 | #include "gnunet_common.h" |
31 | 31 | ||
32 | 32 | #define GNUNET_SET_ACK_WINDOW 10 | |
33 | /** | ||
34 | * The service sends up to GNUNET_SET_ACK_WINDOW messages per client handle, | ||
35 | * the client should send an ack every GNUNET_SET_ACK_WINDOW/2 messages. | ||
36 | */ | ||
37 | #define GNUNET_SET_ACK_WINDOW 8 | ||
38 | 33 | ||
39 | 34 | ||
40 | GNUNET_NETWORK_STRUCT_BEGIN | 35 | GNUNET_NETWORK_STRUCT_BEGIN |
41 | 36 | ||
42 | struct SetCreateMessage | 37 | struct GNUNET_SET_CreateMessage |
43 | { | 38 | { |
44 | /** | 39 | /** |
45 | * Type: GNUNET_MESSAGE_TYPE_SET_CREATE | 40 | * Type: GNUNET_MESSAGE_TYPE_SET_CREATE |
@@ -54,7 +49,7 @@ struct SetCreateMessage | |||
54 | }; | 49 | }; |
55 | 50 | ||
56 | 51 | ||
57 | struct ListenMessage | 52 | struct GNUNET_SET_ListenMessage |
58 | { | 53 | { |
59 | /** | 54 | /** |
60 | * Type: GNUNET_MESSAGE_TYPE_SET_LISTEN | 55 | * Type: GNUNET_MESSAGE_TYPE_SET_LISTEN |
@@ -74,32 +69,31 @@ struct ListenMessage | |||
74 | }; | 69 | }; |
75 | 70 | ||
76 | 71 | ||
77 | struct AcceptMessage | 72 | struct GNUNET_SET_AcceptRejectMessage |
78 | { | 73 | { |
79 | /** | 74 | /** |
80 | * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT | 75 | * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT or |
76 | * GNUNET_MESSAGE_TYPE_SET_REJECT | ||
81 | */ | 77 | */ |
82 | struct GNUNET_MessageHeader header; | 78 | struct GNUNET_MessageHeader header; |
83 | 79 | ||
84 | /** | 80 | /** |
85 | * Request id that will be sent along with | 81 | * ID of the incoming request we want to accept / reject. |
86 | * results for the accepted operation. | ||
87 | * Chosen by the client. | ||
88 | * Must be 0 if the request has been rejected. | ||
89 | */ | 82 | */ |
90 | uint32_t request_id GNUNET_PACKED; | 83 | uint32_t accept_reject_id GNUNET_PACKED; |
91 | 84 | ||
92 | /** | 85 | /** |
93 | * ID of the incoming request we want to accept / reject. | 86 | * Request ID to identify responses, |
87 | * must be 0 if we don't accept the request. | ||
94 | */ | 88 | */ |
95 | uint32_t accept_id GNUNET_PACKED; | 89 | uint32_t request_id GNUNET_PACKED; |
96 | }; | 90 | }; |
97 | 91 | ||
98 | 92 | ||
99 | /** | 93 | /** |
100 | * A request for an operation with another client. | 94 | * A request for an operation with another client. |
101 | */ | 95 | */ |
102 | struct RequestMessage | 96 | struct GNUNET_SET_RequestMessage |
103 | { | 97 | { |
104 | /** | 98 | /** |
105 | * Type: GNUNET_MESSAGE_TYPE_SET_Request. | 99 | * Type: GNUNET_MESSAGE_TYPE_SET_Request. |
@@ -107,21 +101,21 @@ struct RequestMessage | |||
107 | struct GNUNET_MessageHeader header; | 101 | struct GNUNET_MessageHeader header; |
108 | 102 | ||
109 | /** | 103 | /** |
110 | * ID of the request we want to accept, | 104 | * Identity of the requesting peer. |
111 | * chosen by the service. | ||
112 | */ | 105 | */ |
113 | uint32_t accept_id GNUNET_PACKED; | 106 | struct GNUNET_PeerIdentity peer_id; |
114 | 107 | ||
115 | /** | 108 | /** |
116 | * Identity of the requesting peer. | 109 | * ID of the to identify the request when accepting or |
110 | * rejecting it. | ||
117 | */ | 111 | */ |
118 | struct GNUNET_PeerIdentity peer_id; | 112 | uint32_t accept_id GNUNET_PACKED; |
119 | 113 | ||
120 | /* rest: nested context message */ | 114 | /* rest: nested context message */ |
121 | }; | 115 | }; |
122 | 116 | ||
123 | 117 | ||
124 | struct EvaluateMessage | 118 | struct GNUNET_SET_EvaluateMessage |
125 | { | 119 | { |
126 | /** | 120 | /** |
127 | * Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE | 121 | * Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE |
@@ -136,7 +130,7 @@ struct EvaluateMessage | |||
136 | /** | 130 | /** |
137 | * Peer to evaluate the operation with | 131 | * Peer to evaluate the operation with |
138 | */ | 132 | */ |
139 | struct GNUNET_PeerIdentity peer; | 133 | struct GNUNET_PeerIdentity target_peer; |
140 | 134 | ||
141 | /** | 135 | /** |
142 | * Application id | 136 | * Application id |
@@ -157,7 +151,7 @@ struct EvaluateMessage | |||
157 | }; | 151 | }; |
158 | 152 | ||
159 | 153 | ||
160 | struct ResultMessage | 154 | struct GNUNET_SET_ResultMessage |
161 | { | 155 | { |
162 | /** | 156 | /** |
163 | * Type: GNUNET_MESSAGE_TYPE_SET_RESULT | 157 | * Type: GNUNET_MESSAGE_TYPE_SET_RESULT |
@@ -184,7 +178,7 @@ struct ResultMessage | |||
184 | }; | 178 | }; |
185 | 179 | ||
186 | 180 | ||
187 | struct ElementMessage | 181 | struct GNUNET_SET_ElementMessage |
188 | { | 182 | { |
189 | /** | 183 | /** |
190 | * Type: GNUNET_MESSAGE_TYPE_SET_ADD or | 184 | * Type: GNUNET_MESSAGE_TYPE_SET_ADD or |
@@ -200,20 +194,6 @@ struct ElementMessage | |||
200 | }; | 194 | }; |
201 | 195 | ||
202 | 196 | ||
203 | struct CancelMessage | ||
204 | { | ||
205 | /** | ||
206 | * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL | ||
207 | */ | ||
208 | struct GNUNET_MessageHeader header; | ||
209 | |||
210 | /** | ||
211 | * id we want to cancel result belongs to | ||
212 | */ | ||
213 | uint32_t request_id GNUNET_PACKED; | ||
214 | }; | ||
215 | |||
216 | |||
217 | GNUNET_NETWORK_STRUCT_END | 197 | GNUNET_NETWORK_STRUCT_END |
218 | 198 | ||
219 | #endif | 199 | #endif |
diff --git a/src/set/set_api.c b/src/set/set_api.c index 5838680b9..c74933aa0 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -33,6 +33,7 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) |
35 | 35 | ||
36 | |||
36 | /** | 37 | /** |
37 | * Opaque handle to a set. | 38 | * Opaque handle to a set. |
38 | */ | 39 | */ |
@@ -52,13 +53,33 @@ struct GNUNET_SET_Request | |||
52 | int accepted; | 53 | int accepted; |
53 | }; | 54 | }; |
54 | 55 | ||
55 | |||
56 | struct GNUNET_SET_OperationHandle | 56 | struct GNUNET_SET_OperationHandle |
57 | { | 57 | { |
58 | GNUNET_SET_ResultIterator result_cb; | 58 | GNUNET_SET_ResultIterator result_cb; |
59 | void *result_cls; | 59 | void *result_cls; |
60 | |||
61 | /** | ||
62 | * Local set used for the operation, | ||
63 | * NULL if no set has been provided by conclude yet. | ||
64 | */ | ||
60 | struct GNUNET_SET_Handle *set; | 65 | struct GNUNET_SET_Handle *set; |
66 | |||
67 | /** | ||
68 | * Request ID to identify the operation within the set. | ||
69 | */ | ||
61 | uint32_t request_id; | 70 | uint32_t request_id; |
71 | |||
72 | /** | ||
73 | * Message sent to the server on calling conclude, | ||
74 | * NULL if conclude has been called. | ||
75 | */ | ||
76 | struct GNUNET_MQ_Message *conclude_mqm; | ||
77 | |||
78 | /** | ||
79 | * Address of the request if in the conclude message, | ||
80 | * used to patch the request id into the message when the set is known. | ||
81 | */ | ||
82 | uint32_t *request_id_addr; | ||
62 | }; | 83 | }; |
63 | 84 | ||
64 | 85 | ||
@@ -83,18 +104,21 @@ struct GNUNET_SET_ListenHandle | |||
83 | static void | 104 | static void |
84 | handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | 105 | handle_result (void *cls, const struct GNUNET_MessageHeader *mh) |
85 | { | 106 | { |
86 | struct ResultMessage *msg = (struct ResultMessage *) mh; | 107 | struct GNUNET_SET_ResultMessage *msg = (struct GNUNET_SET_ResultMessage *) mh; |
87 | struct GNUNET_SET_Handle *set = cls; | 108 | struct GNUNET_SET_Handle *set = cls; |
88 | struct GNUNET_SET_OperationHandle *oh; | 109 | struct GNUNET_SET_OperationHandle *oh; |
89 | struct GNUNET_SET_Element e; | 110 | struct GNUNET_SET_Element e; |
90 | 111 | ||
112 | |||
113 | GNUNET_assert (NULL != set); | ||
114 | GNUNET_assert (NULL != set->mq); | ||
115 | |||
91 | if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) | 116 | if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) |
92 | { | 117 | { |
93 | struct GNUNET_MQ_Message *mqm; | 118 | struct GNUNET_MQ_Message *mqm; |
94 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); | 119 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); |
95 | GNUNET_MQ_send (set->mq, mqm); | 120 | GNUNET_MQ_send (set->mq, mqm); |
96 | } | 121 | } |
97 | |||
98 | oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); | 122 | oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); |
99 | GNUNET_assert (NULL != oh); | 123 | GNUNET_assert (NULL != oh); |
100 | /* status is not STATUS_OK => there's no attached element, | 124 | /* status is not STATUS_OK => there's no attached element, |
@@ -109,7 +133,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
109 | } | 133 | } |
110 | 134 | ||
111 | e.data = &msg[1]; | 135 | e.data = &msg[1]; |
112 | e.size = ntohs (mh->size) - sizeof (struct ResultMessage); | 136 | e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); |
113 | e.type = msg->element_type; | 137 | e.type = msg->element_type; |
114 | if (NULL != oh->result_cb) | 138 | if (NULL != oh->result_cb) |
115 | oh->result_cb (oh->result_cls, &e, htons (msg->result_status)); | 139 | oh->result_cb (oh->result_cls, &e, htons (msg->result_status)); |
@@ -124,28 +148,34 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
124 | static void | 148 | static void |
125 | handle_request (void *cls, const struct GNUNET_MessageHeader *mh) | 149 | handle_request (void *cls, const struct GNUNET_MessageHeader *mh) |
126 | { | 150 | { |
127 | struct RequestMessage *msg = (struct RequestMessage *) mh; | 151 | struct GNUNET_SET_RequestMessage *msg = (struct GNUNET_SET_RequestMessage *) mh; |
128 | struct GNUNET_SET_ListenHandle *lh = cls; | 152 | struct GNUNET_SET_ListenHandle *lh = cls; |
129 | struct GNUNET_SET_Request *req; | 153 | struct GNUNET_SET_Request *req; |
154 | struct GNUNET_MessageHeader *context_msg; | ||
130 | 155 | ||
156 | LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n"); | ||
131 | req = GNUNET_new (struct GNUNET_SET_Request); | 157 | req = GNUNET_new (struct GNUNET_SET_Request); |
132 | req->accept_id = ntohl (msg->accept_id); | 158 | req->accept_id = ntohl (msg->accept_id); |
159 | context_msg = GNUNET_MQ_extract_nested_mh (msg); | ||
133 | /* calling GNUNET_SET_accept in the listen cb will set req->accepted */ | 160 | /* calling GNUNET_SET_accept in the listen cb will set req->accepted */ |
134 | lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); | 161 | lh->listen_cb (lh->listen_cls, &msg->peer_id, context_msg, req); |
135 | 162 | ||
136 | if (GNUNET_NO == req->accepted) | 163 | if (GNUNET_NO == req->accepted) |
137 | { | 164 | { |
138 | struct GNUNET_MQ_Message *mqm; | 165 | struct GNUNET_MQ_Message *mqm; |
139 | struct AcceptMessage *amsg; | 166 | struct GNUNET_SET_AcceptRejectMessage *amsg; |
140 | 167 | ||
141 | mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); | 168 | mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT); |
142 | /* no request id, as we refused */ | 169 | /* no request id, as we refused */ |
143 | amsg->request_id = htonl (0); | 170 | amsg->request_id = htonl (0); |
144 | amsg->accept_id = msg->accept_id; | 171 | amsg->accept_reject_id = msg->accept_id; |
145 | GNUNET_MQ_send (lh->mq, mqm); | 172 | GNUNET_MQ_send (lh->mq, mqm); |
146 | GNUNET_free (req); | 173 | GNUNET_free (req); |
174 | LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n"); | ||
147 | } | 175 | } |
148 | 176 | ||
177 | LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n"); | ||
178 | |||
149 | /* the accept-case is handled in GNUNET_SET_accept, | 179 | /* the accept-case is handled in GNUNET_SET_accept, |
150 | * as we have the accept message available there */ | 180 | * as we have the accept message available there */ |
151 | } | 181 | } |
@@ -168,7 +198,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
168 | { | 198 | { |
169 | struct GNUNET_SET_Handle *set; | 199 | struct GNUNET_SET_Handle *set; |
170 | struct GNUNET_MQ_Message *mqm; | 200 | struct GNUNET_MQ_Message *mqm; |
171 | struct SetCreateMessage *msg; | 201 | struct GNUNET_SET_CreateMessage *msg; |
172 | static const struct GNUNET_MQ_Handler mq_handlers[] = { | 202 | static const struct GNUNET_MQ_Handler mq_handlers[] = { |
173 | {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, | 203 | {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, |
174 | GNUNET_MQ_HANDLERS_END | 204 | GNUNET_MQ_HANDLERS_END |
@@ -179,6 +209,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
179 | LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n"); | 209 | LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n"); |
180 | GNUNET_assert (NULL != set->client); | 210 | GNUNET_assert (NULL != set->client); |
181 | set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); | 211 | set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); |
212 | GNUNET_assert (NULL != set->mq); | ||
182 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); | 213 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); |
183 | msg->operation = htons (op); | 214 | msg->operation = htons (op); |
184 | GNUNET_MQ_send (set->mq, mqm); | 215 | GNUNET_MQ_send (set->mq, mqm); |
@@ -204,7 +235,7 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set, | |||
204 | void *cont_cls) | 235 | void *cont_cls) |
205 | { | 236 | { |
206 | struct GNUNET_MQ_Message *mqm; | 237 | struct GNUNET_MQ_Message *mqm; |
207 | struct ElementMessage *msg; | 238 | struct GNUNET_SET_ElementMessage *msg; |
208 | 239 | ||
209 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); | 240 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); |
210 | msg->element_type = element->type; | 241 | msg->element_type = element->type; |
@@ -232,7 +263,7 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set, | |||
232 | void *cont_cls) | 263 | void *cont_cls) |
233 | { | 264 | { |
234 | struct GNUNET_MQ_Message *mqm; | 265 | struct GNUNET_MQ_Message *mqm; |
235 | struct ElementMessage *msg; | 266 | struct GNUNET_SET_ElementMessage *msg; |
236 | 267 | ||
237 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); | 268 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); |
238 | msg->element_type = element->type; | 269 | msg->element_type = element->type; |
@@ -256,10 +287,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) | |||
256 | 287 | ||
257 | 288 | ||
258 | /** | 289 | /** |
259 | * Evaluate a set operation with our set and the set of another peer. | 290 | * Create a set operation for evaluation with another peer. |
291 | * The evaluation will not start until the client provides | ||
292 | * a local set with GNUNET_SET_conclude. | ||
260 | * | 293 | * |
261 | * @param set set to use | ||
262 | * @param salt salt for HKDF (explain more here) | ||
263 | * @param other_peer peer with the other set | 294 | * @param other_peer peer with the other set |
264 | * @param app_id hash for the application using the set | 295 | * @param app_id hash for the application using the set |
265 | * @param context_msg additional information for the request | 296 | * @param context_msg additional information for the request |
@@ -273,8 +304,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) | |||
273 | * @return a handle to cancel the operation | 304 | * @return a handle to cancel the operation |
274 | */ | 305 | */ |
275 | struct GNUNET_SET_OperationHandle * | 306 | struct GNUNET_SET_OperationHandle * |
276 | GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | 307 | GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, |
277 | const struct GNUNET_PeerIdentity *other_peer, | ||
278 | const struct GNUNET_HashCode *app_id, | 308 | const struct GNUNET_HashCode *app_id, |
279 | const struct GNUNET_MessageHeader *context_msg, | 309 | const struct GNUNET_MessageHeader *context_msg, |
280 | uint16_t salt, | 310 | uint16_t salt, |
@@ -283,24 +313,24 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
283 | void *result_cls) | 313 | void *result_cls) |
284 | { | 314 | { |
285 | struct GNUNET_MQ_Message *mqm; | 315 | struct GNUNET_MQ_Message *mqm; |
286 | struct EvaluateMessage *msg; | ||
287 | struct GNUNET_SET_OperationHandle *oh; | 316 | struct GNUNET_SET_OperationHandle *oh; |
317 | struct GNUNET_SET_EvaluateMessage *msg; | ||
288 | 318 | ||
289 | oh = GNUNET_new (struct GNUNET_SET_OperationHandle); | 319 | oh = GNUNET_new (struct GNUNET_SET_OperationHandle); |
290 | oh->result_cb = result_cb; | 320 | oh->result_cb = result_cb; |
291 | oh->result_cls = result_cls; | 321 | oh->result_cls = result_cls; |
292 | oh->set = set; | ||
293 | 322 | ||
294 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE); | 323 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg); |
295 | msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); | 324 | |
296 | msg->peer = *other_peer; | ||
297 | msg->app_id = *app_id; | ||
298 | |||
299 | if (NULL != context_msg) | 325 | if (NULL != context_msg) |
300 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size))) | 326 | LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n"); |
301 | GNUNET_assert (0); | 327 | |
302 | 328 | msg->app_id = *app_id; | |
303 | GNUNET_MQ_send (set->mq, mqm); | 329 | msg->target_peer = *other_peer; |
330 | msg->salt = salt; | ||
331 | msg->reserved = 0; | ||
332 | oh->conclude_mqm = mqm; | ||
333 | oh->request_id_addr = &msg->request_id; | ||
304 | 334 | ||
305 | return oh; | 335 | return oh; |
306 | } | 336 | } |
@@ -327,7 +357,7 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
327 | { | 357 | { |
328 | struct GNUNET_SET_ListenHandle *lh; | 358 | struct GNUNET_SET_ListenHandle *lh; |
329 | struct GNUNET_MQ_Message *mqm; | 359 | struct GNUNET_MQ_Message *mqm; |
330 | struct ListenMessage *msg; | 360 | struct GNUNET_SET_ListenMessage *msg; |
331 | static const struct GNUNET_MQ_Handler mq_handlers[] = { | 361 | static const struct GNUNET_MQ_Handler mq_handlers[] = { |
332 | {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, | 362 | {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, |
333 | GNUNET_MQ_HANDLERS_END | 363 | GNUNET_MQ_HANDLERS_END |
@@ -363,10 +393,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) | |||
363 | 393 | ||
364 | 394 | ||
365 | /** | 395 | /** |
366 | * Accept a request we got via GNUNET_SET_listen. | 396 | * Accept a request we got via GNUNET_SET_listen. Must be called during |
397 | * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid | ||
398 | * afterwards. | ||
399 | * Call GNUNET_SET_conclude to provide the local set to use for the operation, | ||
400 | * and to begin the exchange with the remote peer. | ||
367 | * | 401 | * |
368 | * @param request request to accept | 402 | * @param request request to accept |
369 | * @param set set used for the requested operation | ||
370 | * @param result_mode specified how results will be returned, | 403 | * @param result_mode specified how results will be returned, |
371 | * see 'GNUNET_SET_ResultMode'. | 404 | * see 'GNUNET_SET_ResultMode'. |
372 | * @param result_cb callback for the results | 405 | * @param result_cb callback for the results |
@@ -375,28 +408,26 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) | |||
375 | */ | 408 | */ |
376 | struct GNUNET_SET_OperationHandle * | 409 | struct GNUNET_SET_OperationHandle * |
377 | GNUNET_SET_accept (struct GNUNET_SET_Request *request, | 410 | GNUNET_SET_accept (struct GNUNET_SET_Request *request, |
378 | struct GNUNET_SET_Handle *set, | ||
379 | enum GNUNET_SET_ResultMode result_mode, | 411 | enum GNUNET_SET_ResultMode result_mode, |
380 | GNUNET_SET_ResultIterator result_cb, | 412 | GNUNET_SET_ResultIterator result_cb, |
381 | void *result_cls) | 413 | void *cls) |
382 | { | 414 | { |
383 | struct GNUNET_MQ_Message *mqm; | 415 | struct GNUNET_MQ_Message *mqm; |
384 | struct AcceptMessage *msg; | ||
385 | struct GNUNET_SET_OperationHandle *oh; | 416 | struct GNUNET_SET_OperationHandle *oh; |
417 | struct GNUNET_SET_AcceptRejectMessage *msg; | ||
386 | 418 | ||
387 | /* don't accept a request twice! */ | ||
388 | GNUNET_assert (GNUNET_NO == request->accepted); | 419 | GNUNET_assert (GNUNET_NO == request->accepted); |
389 | request->accepted = GNUNET_YES; | 420 | request->accepted = GNUNET_YES; |
390 | 421 | ||
391 | oh = GNUNET_new (struct GNUNET_SET_OperationHandle); | 422 | oh = GNUNET_new (struct GNUNET_SET_OperationHandle); |
392 | oh->result_cb = result_cb; | 423 | oh->result_cb = result_cb; |
393 | oh->result_cls = result_cls; | 424 | oh->result_cls = cls; |
394 | oh->set = set; | ||
395 | 425 | ||
396 | mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); | 426 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); |
397 | msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh)); | 427 | msg->accept_reject_id = htonl (request->accept_id); |
398 | msg->accept_id = htonl (request->accept_id); | 428 | |
399 | GNUNET_MQ_send (set->mq, mqm); | 429 | oh->conclude_mqm = mqm; |
430 | oh->request_id_addr = &msg->request_id; | ||
400 | 431 | ||
401 | return oh; | 432 | return oh; |
402 | } | 433 | } |
@@ -413,10 +444,43 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | |||
413 | struct GNUNET_MQ_Message *mqm; | 444 | struct GNUNET_MQ_Message *mqm; |
414 | struct GNUNET_SET_OperationHandle *h_assoc; | 445 | struct GNUNET_SET_OperationHandle *h_assoc; |
415 | 446 | ||
416 | h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); | 447 | if (NULL != oh->set) |
417 | GNUNET_assert (h_assoc == oh); | 448 | { |
418 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); | 449 | h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); |
419 | GNUNET_MQ_send (oh->set->mq, mqm); | 450 | GNUNET_assert (h_assoc == oh); |
451 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); | ||
452 | GNUNET_MQ_send (oh->set->mq, mqm); | ||
453 | } | ||
454 | |||
455 | if (NULL != oh->conclude_mqm) | ||
456 | GNUNET_MQ_discard (oh->conclude_mqm); | ||
457 | |||
420 | GNUNET_free (oh); | 458 | GNUNET_free (oh); |
421 | } | 459 | } |
422 | 460 | ||
461 | |||
462 | /** | ||
463 | * Conclude the given set operation using the given set. | ||
464 | * This function is called once we have fully constructed | ||
465 | * the set that we want to use for the operation. At this | ||
466 | * time, the P2P protocol can then begin to exchange the | ||
467 | * set information and call the result callback with the | ||
468 | * result information. | ||
469 | * | ||
470 | * @param oh handle to the set operation | ||
471 | * @param set the set to use for the operation | ||
472 | */ | ||
473 | void | ||
474 | GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, | ||
475 | struct GNUNET_SET_Handle *set) | ||
476 | { | ||
477 | GNUNET_assert (NULL == oh->set); | ||
478 | GNUNET_assert (NULL != oh->conclude_mqm); | ||
479 | oh->set = set; | ||
480 | oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh); | ||
481 | *oh->request_id_addr = htonl (oh->request_id); | ||
482 | GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm); | ||
483 | oh->conclude_mqm = NULL; | ||
484 | oh->request_id_addr = NULL; | ||
485 | } | ||
486 | |||
diff --git a/src/set/test_set.conf b/src/set/test_set.conf index 34b7a8d2f..7bc26ed7e 100644 --- a/src/set/test_set.conf +++ b/src/set/test_set.conf | |||
@@ -8,8 +8,8 @@ PORT = 2106 | |||
8 | HOSTNAME = localhost | 8 | HOSTNAME = localhost |
9 | HOME = $SERVICEHOME | 9 | HOME = $SERVICEHOME |
10 | BINARY = gnunet-service-set | 10 | BINARY = gnunet-service-set |
11 | #PREFIX = gdbserver :12345 | ||
12 | #PREFIX = valgrind --leak-check=full | 11 | #PREFIX = valgrind --leak-check=full |
12 | #PREFIX = gdbserver :1234 | ||
13 | ACCEPT_FROM = 127.0.0.1; | 13 | ACCEPT_FROM = 127.0.0.1; |
14 | ACCEPT_FROM6 = ::1; | 14 | ACCEPT_FROM6 = ::1; |
15 | UNIXPATH = /tmp/gnunet-service-set.sock | 15 | UNIXPATH = /tmp/gnunet-service-set.sock |
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c index bf0d65697..f773cebdf 100644 --- a/src/set/test_set_api.c +++ b/src/set/test_set_api.c | |||
@@ -20,7 +20,7 @@ | |||
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file set/test_set_api.c | 22 | * @file set/test_set_api.c |
23 | * @brief testcase for consensus_api.c | 23 | * @brief testcase for set_api.c |
24 | */ | 24 | */ |
25 | #include "platform.h" | 25 | #include "platform.h" |
26 | #include "gnunet_util_lib.h" | 26 | #include "gnunet_util_lib.h" |
@@ -89,11 +89,13 @@ listen_cb (void *cls, | |||
89 | const struct GNUNET_MessageHeader *context_msg, | 89 | const struct GNUNET_MessageHeader *context_msg, |
90 | struct GNUNET_SET_Request *request) | 90 | struct GNUNET_SET_Request *request) |
91 | { | 91 | { |
92 | struct GNUNET_SET_OperationHandle *oh; | ||
93 | |||
92 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); | 94 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); |
93 | GNUNET_SET_listen_cancel (listen_handle); | 95 | GNUNET_SET_listen_cancel (listen_handle); |
94 | 96 | ||
95 | GNUNET_SET_accept (request, set2, | 97 | oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); |
96 | GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); | 98 | GNUNET_SET_conclude (oh, set2); |
97 | } | 99 | } |
98 | 100 | ||
99 | 101 | ||
@@ -105,11 +107,14 @@ listen_cb (void *cls, | |||
105 | static void | 107 | static void |
106 | start (void *cls) | 108 | start (void *cls) |
107 | { | 109 | { |
110 | struct GNUNET_SET_OperationHandle *oh; | ||
111 | |||
108 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | 112 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, |
109 | &app_id, listen_cb, NULL); | 113 | &app_id, listen_cb, NULL); |
110 | GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, | 114 | oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, |
111 | GNUNET_SET_RESULT_ADDED, | 115 | GNUNET_SET_RESULT_ADDED, |
112 | result_cb_set1, NULL); | 116 | result_cb_set1, NULL); |
117 | GNUNET_SET_conclude (oh, set1); | ||
113 | } | 118 | } |
114 | 119 | ||
115 | 120 | ||
@@ -168,12 +173,14 @@ run (void *cls, | |||
168 | struct GNUNET_TESTING_Peer *peer) | 173 | struct GNUNET_TESTING_Peer *peer) |
169 | { | 174 | { |
170 | 175 | ||
171 | static const char* app_str = "gnunet-set"; | ||
172 | |||
173 | config = cfg; | 176 | config = cfg; |
177 | GNUNET_CRYPTO_get_host_identity (cfg, &local_id); | ||
178 | printf ("my id (from CRYPTO): %s\n", GNUNET_h2s (&local_id.hashPubKey)); | ||
174 | GNUNET_TESTING_peer_get_identity (peer, &local_id); | 179 | GNUNET_TESTING_peer_get_identity (peer, &local_id); |
180 | printf ("my id (from TESTING): %s\n", GNUNET_h2s (&local_id.hashPubKey)); | ||
175 | set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | 181 | set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); |
176 | set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | 182 | set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); |
183 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); | ||
177 | init_set1 (); | 184 | init_set1 (); |
178 | } | 185 | } |
179 | 186 | ||
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index b4a47b53d..34f1ea0fa 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -3785,7 +3785,29 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size | |||
3785 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | 3785 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; |
3786 | struct GNUNET_MQ_Message *mqm; | 3786 | struct GNUNET_MQ_Message *mqm; |
3787 | 3787 | ||
3788 | GNUNET_assert (GNUNET_STREAM_OK == status); | 3788 | switch (status) |
3789 | { | ||
3790 | case GNUNET_STREAM_OK: | ||
3791 | break; | ||
3792 | case GNUNET_STREAM_SHUTDOWN: | ||
3793 | /* FIXME: call shutdown handler */ | ||
3794 | return; | ||
3795 | case GNUNET_STREAM_TIMEOUT: | ||
3796 | if (NULL == mq->error_handler) | ||
3797 | LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n"); | ||
3798 | else | ||
3799 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); | ||
3800 | return; | ||
3801 | case GNUNET_STREAM_SYSERR: | ||
3802 | if (NULL == mq->error_handler) | ||
3803 | LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n"); | ||
3804 | else | ||
3805 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE); | ||
3806 | return; | ||
3807 | default: | ||
3808 | GNUNET_assert (0); | ||
3809 | return; | ||
3810 | } | ||
3789 | 3811 | ||
3790 | /* call cb for message we finished sending */ | 3812 | /* call cb for message we finished sending */ |
3791 | mqm = mq->current_msg; | 3813 | mqm = mq->current_msg; |
@@ -3863,21 +3885,53 @@ mq_stream_mst_callback (void *cls, void *client, | |||
3863 | */ | 3885 | */ |
3864 | static size_t | 3886 | static size_t |
3865 | mq_stream_data_processor (void *cls, | 3887 | mq_stream_data_processor (void *cls, |
3866 | enum GNUNET_STREAM_Status status, | 3888 | enum GNUNET_STREAM_Status status, |
3867 | const void *data, | 3889 | const void *data, |
3868 | size_t size) | 3890 | size_t size) |
3869 | { | 3891 | { |
3870 | struct GNUNET_MQ_MessageQueue *mq = cls; | 3892 | struct GNUNET_MQ_MessageQueue *mq = cls; |
3871 | struct MQStreamState *mss; | 3893 | struct MQStreamState *mss; |
3872 | int ret; | 3894 | int ret; |
3895 | |||
3896 | switch (status) | ||
3897 | { | ||
3898 | case GNUNET_STREAM_OK: | ||
3899 | break; | ||
3900 | case GNUNET_STREAM_SHUTDOWN: | ||
3901 | /* FIXME: call shutdown handler */ | ||
3902 | return 0; | ||
3903 | case GNUNET_STREAM_TIMEOUT: | ||
3904 | if (NULL == mq->error_handler) | ||
3905 | LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n"); | ||
3906 | else | ||
3907 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); | ||
3908 | return 0; | ||
3909 | case GNUNET_STREAM_SYSERR: | ||
3910 | if (NULL == mq->error_handler) | ||
3911 | LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n"); | ||
3912 | else | ||
3913 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
3914 | return 0; | ||
3915 | default: | ||
3916 | GNUNET_assert (0); | ||
3917 | return 0; | ||
3918 | } | ||
3873 | 3919 | ||
3874 | mss = (struct MQStreamState *) mq->impl_state; | 3920 | mss = (struct MQStreamState *) mq->impl_state; |
3875 | GNUNET_assert (GNUNET_STREAM_OK == status); | 3921 | GNUNET_assert (GNUNET_STREAM_OK == status); |
3876 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | 3922 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); |
3877 | GNUNET_assert (GNUNET_OK == ret); | 3923 | if (GNUNET_OK != ret) |
3924 | { | ||
3925 | if (NULL == mq->error_handler) | ||
3926 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
3927 | "read error (message stream malformed), but no error handler installed for message queue\n"); | ||
3928 | else | ||
3929 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
3930 | return 0; | ||
3931 | } | ||
3878 | /* we always read all data */ | 3932 | /* we always read all data */ |
3879 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, | 3933 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, |
3880 | mq_stream_data_processor, mq); | 3934 | mq_stream_data_processor, mq); |
3881 | return size; | 3935 | return size; |
3882 | } | 3936 | } |
3883 | 3937 | ||
@@ -3935,6 +3989,7 @@ GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, | |||
3935 | mq->destroy_impl = mq_stream_destroy_impl; | 3989 | mq->destroy_impl = mq_stream_destroy_impl; |
3936 | mq->handlers = msg_handlers; | 3990 | mq->handlers = msg_handlers; |
3937 | mq->handlers_cls = cls; | 3991 | mq->handlers_cls = cls; |
3992 | mq->error_handler = error_handler; | ||
3938 | if (NULL != msg_handlers) | 3993 | if (NULL != msg_handlers) |
3939 | { | 3994 | { |
3940 | mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); | 3995 | mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); |
diff --git a/src/util/crypto_hash.c b/src/util/crypto_hash.c index fca66aed4..2e436c454 100644 --- a/src/util/crypto_hash.c +++ b/src/util/crypto_hash.c | |||
@@ -339,7 +339,7 @@ GNUNET_CRYPTO_hash_distance_u32 (const struct GNUNET_HashCode * a, | |||
339 | */ | 339 | */ |
340 | void | 340 | void |
341 | GNUNET_CRYPTO_hash_create_random (enum GNUNET_CRYPTO_Quality mode, | 341 | GNUNET_CRYPTO_hash_create_random (enum GNUNET_CRYPTO_Quality mode, |
342 | struct GNUNET_HashCode * result) | 342 | struct GNUNET_HashCode *result) |
343 | { | 343 | { |
344 | int i; | 344 | int i; |
345 | 345 | ||
diff --git a/src/util/mq.c b/src/util/mq.c index 36cacd30b..dc87b9711 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -119,33 +119,31 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
119 | } | 119 | } |
120 | 120 | ||
121 | 121 | ||
122 | int | 122 | struct GNUNET_MQ_Message * |
123 | GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, | 123 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, |
124 | const void *data, uint16_t len) | 124 | const struct GNUNET_MessageHeader *nested_mh) |
125 | { | 125 | { |
126 | size_t new_size; | 126 | struct GNUNET_MQ_Message *mqm; |
127 | size_t old_size; | 127 | uint16_t size; |
128 | 128 | ||
129 | GNUNET_assert (NULL != mqmp); | 129 | if (NULL == nested_mh) |
130 | /* there's no data to append => do nothing */ | 130 | return GNUNET_MQ_msg_ (mhp, base_size, type); |
131 | if (NULL == data) | ||
132 | return GNUNET_OK; | ||
133 | old_size = ntohs ((*mqmp)->mh->size); | ||
134 | /* message too large to concatenate? */ | ||
135 | if (((uint16_t) (old_size + len)) < len) | ||
136 | return GNUNET_SYSERR; | ||
137 | new_size = old_size + len; | ||
138 | *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); | ||
139 | (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1]; | ||
140 | memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size); | ||
141 | (*mqmp)->mh->size = htons (new_size); | ||
142 | return GNUNET_OK; | ||
143 | } | ||
144 | 131 | ||
132 | size = base_size + ntohs (nested_mh->size); | ||
145 | 133 | ||
134 | /* check for uint16_t overflow */ | ||
135 | if (size < base_size) | ||
136 | return NULL; | ||
137 | |||
138 | mqm = GNUNET_MQ_msg_ (mhp, size, type); | ||
139 | memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size)); | ||
140 | |||
141 | return mqm; | ||
142 | } | ||
146 | 143 | ||
147 | 144 | ||
148 | /*** Transmit a queued message to the session's client. | 145 | /** |
146 | * Transmit a queued message to the session's client. | ||
149 | * | 147 | * |
150 | * @param cls consensus session | 148 | * @param cls consensus session |
151 | * @param size number of bytes available in buf | 149 | * @param size number of bytes available in buf |
@@ -265,7 +263,8 @@ handle_client_message (void *cls, | |||
265 | { | 263 | { |
266 | if (NULL == mq->error_handler) | 264 | if (NULL == mq->error_handler) |
267 | LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); | 265 | LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); |
268 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | 266 | else |
267 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
269 | return; | 268 | return; |
270 | } | 269 | } |
271 | 270 | ||
@@ -479,3 +478,39 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | |||
479 | GNUNET_free (mq); | 478 | GNUNET_free (mq); |
480 | } | 479 | } |
481 | 480 | ||
481 | |||
482 | |||
483 | |||
484 | struct GNUNET_MessageHeader * | ||
485 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) | ||
486 | { | ||
487 | uint16_t whole_size; | ||
488 | uint16_t nested_size; | ||
489 | struct GNUNET_MessageHeader *nested_msg; | ||
490 | |||
491 | whole_size = ntohs (mh->size); | ||
492 | GNUNET_assert (whole_size >= base_size); | ||
493 | |||
494 | nested_size = whole_size - base_size; | ||
495 | |||
496 | if (0 == nested_size) | ||
497 | return NULL; | ||
498 | |||
499 | if (nested_size < sizeof (struct GNUNET_MessageHeader)) | ||
500 | { | ||
501 | GNUNET_break_op (0); | ||
502 | return NULL; | ||
503 | } | ||
504 | |||
505 | nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size); | ||
506 | |||
507 | if (ntohs (nested_msg->size) != nested_size) | ||
508 | { | ||
509 | GNUNET_break_op (0); | ||
510 | nested_msg->size = htons (nested_size); | ||
511 | } | ||
512 | |||
513 | return nested_msg; | ||
514 | } | ||
515 | |||
516 | |||
diff --git a/src/util/test_mq.c b/src/util/test_mq.c index 161b40a20..55cd80ef1 100644 --- a/src/util/test_mq.c +++ b/src/util/test_mq.c | |||
@@ -58,35 +58,6 @@ void | |||
58 | test2 (void) | 58 | test2 (void) |
59 | { | 59 | { |
60 | struct GNUNET_MQ_Message *mqm; | 60 | struct GNUNET_MQ_Message *mqm; |
61 | struct MyMessage *mm; | ||
62 | int res; | ||
63 | char *s = "foo"; | ||
64 | |||
65 | mqm = GNUNET_MQ_msg (mm, 42); | ||
66 | res = GNUNET_MQ_nest (mqm, s, strlen(s)); | ||
67 | GNUNET_assert (GNUNET_OK == res); | ||
68 | res = GNUNET_MQ_nest (mqm, s, strlen(s)); | ||
69 | GNUNET_assert (GNUNET_OK == res); | ||
70 | res = GNUNET_MQ_nest (mqm, NULL, 0); | ||
71 | GNUNET_assert (GNUNET_OK == res); | ||
72 | |||
73 | GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs (mm->header.size)); | ||
74 | |||
75 | res = GNUNET_MQ_nest_mh (mqm, &mm->header); | ||
76 | GNUNET_assert (GNUNET_OK == res); | ||
77 | GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs (mm->header.size)); | ||
78 | |||
79 | res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0); | ||
80 | GNUNET_assert (GNUNET_OK != res); | ||
81 | |||
82 | GNUNET_MQ_discard (mqm); | ||
83 | } | ||
84 | |||
85 | |||
86 | void | ||
87 | test3 (void) | ||
88 | { | ||
89 | struct GNUNET_MQ_Message *mqm; | ||
90 | struct GNUNET_MessageHeader *mh; | 61 | struct GNUNET_MessageHeader *mh; |
91 | 62 | ||
92 | mqm = GNUNET_MQ_msg_header (42); | 63 | mqm = GNUNET_MQ_msg_header (42); |
@@ -107,7 +78,6 @@ main (int argc, char **argv) | |||
107 | GNUNET_log_setup ("test-mq", "INFO", NULL); | 78 | GNUNET_log_setup ("test-mq", "INFO", NULL); |
108 | test1 (); | 79 | test1 (); |
109 | test2 (); | 80 | test2 (); |
110 | test3 (); | ||
111 | 81 | ||
112 | return 0; | 82 | return 0; |
113 | } | 83 | } |