aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
commit68403fa780bf94ace2ebc13c2c09463cbbc0b57c (patch)
tree3442e4f25de90eab67c4f9813cb6e433c50b7482 /src
parentfae7f583f2e11cac15fefcbefef64287ab6915d3 (diff)
downloadgnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.tar.gz
gnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.zip
- conclude for SET
- consensus with SET
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/consensus/Makefile.am4
-rw-r--r--src/consensus/consensus_api.c6
-rw-r--r--src/consensus/consensus_protocol.h11
-rw-r--r--src/consensus/gnunet-consensus.c6
-rw-r--r--src/consensus/gnunet-service-consensus.c407
-rw-r--r--src/consensus/test_consensus.conf8
-rw-r--r--src/dv/gnunet-service-dv.c4
-rw-r--r--src/include/gnunet_consensus_service.h27
-rw-r--r--src/include/gnunet_mq_lib.h89
-rw-r--r--src/include/gnunet_protocols.h34
-rw-r--r--src/include/gnunet_set_service.h27
-rw-r--r--src/set/gnunet-service-set.c166
-rw-r--r--src/set/gnunet-service-set.h8
-rw-r--r--src/set/gnunet-service-set_union.c116
-rw-r--r--src/set/gnunet-set.c14
-rw-r--r--src/set/set.h62
-rw-r--r--src/set/set_api.c156
-rw-r--r--src/set/test_set.conf2
-rw-r--r--src/set/test_set_api.c23
-rw-r--r--src/stream/stream_api.c69
-rw-r--r--src/util/crypto_hash.c2
-rw-r--r--src/util/mq.c81
-rw-r--r--src/util/test_mq.c30
24 files changed, 773 insertions, 581 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 6819ec90b..2f4397dbe 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -3,7 +3,7 @@
3#endif 3#endif
4 4
5if HAVE_EXPERIMENTAL 5if HAVE_EXPERIMENTAL
6 EXP_DIR = gns consensus dv set experimentation 6 EXP_DIR = gns set dv consensus experimentation
7endif 7endif
8 8
9if LINUX 9if LINUX
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index a0edb1d65..914fbdef8 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -61,6 +61,8 @@ gnunet_service_consensus_LDADD = \
61 $(top_builddir)/src/mesh/libgnunetmesh.la \ 61 $(top_builddir)/src/mesh/libgnunetmesh.la \
62 $(top_builddir)/src/set/libgnunetset.la \ 62 $(top_builddir)/src/set/libgnunetset.la \
63 $(GN_LIBINTL) 63 $(GN_LIBINTL)
64gnunet_service_consensus_DEPENDENCIES = \
65 $(top_builddir)/src/set/libgnunetset.la
64 66
65gnunet_service_evil_consensus_SOURCES = \ 67gnunet_service_evil_consensus_SOURCES = \
66 gnunet-service-consensus.c 68 gnunet-service-consensus.c
@@ -71,6 +73,8 @@ gnunet_service_evil_consensus_LDADD = \
71 $(top_builddir)/src/mesh/libgnunetmesh.la \ 73 $(top_builddir)/src/mesh/libgnunetmesh.la \
72 $(top_builddir)/src/set/libgnunetset.la \ 74 $(top_builddir)/src/set/libgnunetset.la \
73 $(GN_LIBINTL) 75 $(GN_LIBINTL)
76gnunet_service_evil_consensus_DEPENDENCIES = \
77 $(top_builddir)/src/set/libgnunetset.la
74gnunet_service_evil_consensus_CFLAGS = -DEVIL 78gnunet_service_evil_consensus_CFLAGS = -DEVIL
75 79
76libgnunetconsensus_la_SOURCES = \ 80libgnunetconsensus_la_SOURCES = \
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 635a610ca..e3ddb4913 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -236,9 +236,9 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus)
236 */ 236 */
237static void 237static void
238handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, 238handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
239 struct GNUNET_CONSENSUS_ElementMessage *msg) 239 struct GNUNET_CONSENSUS_ElementMessage *msg)
240{ 240{
241 struct GNUNET_CONSENSUS_Element element; 241 struct GNUNET_SET_Element element;
242 242
243 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); 243 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
244 244
@@ -424,7 +424,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
424 */ 424 */
425void 425void
426GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, 426GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
427 const struct GNUNET_CONSENSUS_Element *element, 427 const struct GNUNET_SET_Element *element,
428 GNUNET_CONSENSUS_InsertDoneCallback idc, 428 GNUNET_CONSENSUS_InsertDoneCallback idc,
429 void *idc_cls) 429 void *idc_cls)
430{ 430{
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index af4d74100..4e0673c7d 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -38,12 +38,15 @@ GNUNET_NETWORK_STRUCT_BEGIN
38/** 38/**
39 * Sent as context message for set reconciliation. 39 * Sent as context message for set reconciliation.
40 */ 40 */
41struct ConsensusRoundMessage 41struct GNUNET_CONSENSUS_RoundContextMessage
42{ 42{
43 /**
44 * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
45 */
43 struct GNUNET_MessageHeader header; 46 struct GNUNET_MessageHeader header;
44 uint8_t round; 47 uint32_t round;
45 uint8_t exp_round; 48 uint32_t exp_round;
46 uint8_t exp_subround; 49 uint32_t exp_subround;
47}; 50};
48 51
49GNUNET_NETWORK_STRUCT_END 52GNUNET_NETWORK_STRUCT_END
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index d8c1b14ee..60db8c61a 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -133,7 +133,7 @@ do_consensus ()
133 { 133 {
134 int j; 134 int j;
135 struct GNUNET_HashCode *val; 135 struct GNUNET_HashCode *val;
136 struct GNUNET_CONSENSUS_Element *element; 136 struct GNUNET_SET_Element *element;
137 generate_indices(unique_indices); 137 generate_indices(unique_indices);
138 138
139 val = GNUNET_malloc (sizeof *val); 139 val = GNUNET_malloc (sizeof *val);
@@ -151,6 +151,8 @@ do_consensus ()
151 } 151 }
152 } 152 }
153 153
154 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "all elements inserted, calling conclude\n");
155
154 for (i = 0; i < num_peers; i++) 156 for (i = 0; i < num_peers; i++)
155 GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]); 157 GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]);
156} 158}
@@ -194,7 +196,7 @@ connect_complete (void *cls,
194 196
195static void 197static void
196new_element_cb (void *cls, 198new_element_cb (void *cls,
197 const struct GNUNET_CONSENSUS_Element *element) 199 const struct GNUNET_SET_Element *element)
198{ 200{
199 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); 201 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
200} 202}
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 44edeb215..21123b24f 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors) 3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -43,9 +43,9 @@
43 43
44 44
45/** 45/**
46 * Number of exponential rounds, used in the inventory and completion round. 46 * Number of exponential rounds, used in the exp and completion round.
47 */ 47 */
48#define NUM_EXP_ROUNDS (4) 48#define NUM_EXP_ROUNDS 4
49 49
50/* forward declarations */ 50/* forward declarations */
51 51
@@ -155,17 +155,17 @@ struct ConsensusSession
155 * Permutation of peers for the current round, 155 * Permutation of peers for the current round,
156 * maps logical index (for current round) to physical index (location in info array) 156 * maps logical index (for current round) to physical index (location in info array)
157 */ 157 */
158 int *shuffle; 158 uint32_t *shuffle;
159 159
160 /** 160 /**
161 * Current round of the exponential scheme. 161 * Current round of the exponential scheme.
162 */ 162 */
163 int exp_round; 163 uint32_t exp_round;
164 164
165 /** 165 /**
166 * Current sub-round of the exponential scheme. 166 * Current sub-round of the exponential scheme.
167 */ 167 */
168 int exp_subround; 168 uint32_t exp_subround;
169 169
170 /** 170 /**
171 * The partner for the current exp-round 171 * The partner for the current exp-round
@@ -201,17 +201,6 @@ struct ConsensusPeerInformation
201 struct GNUNET_PeerIdentity peer_id; 201 struct GNUNET_PeerIdentity peer_id;
202 202
203 /** 203 /**
204 * Do we connect to the peer, or does the peer connect to us?
205 * Only valid for all-to-all phases
206 */
207 int is_outgoing;
208
209 /**
210 * Did we receive/send a consensus hello?
211 */
212 int hello;
213
214 /**
215 * Back-reference to the consensus session, 204 * Back-reference to the consensus session,
216 * to that ConsensusPeerInformation can be used as a closure 205 * to that ConsensusPeerInformation can be used as a closure
217 */ 206 */
@@ -223,22 +212,14 @@ struct ConsensusPeerInformation
223 int exp_subround_finished; 212 int exp_subround_finished;
224 213
225 /** 214 /**
226 * GNUNET_YES if we synced inventory with this peer; 215 * Set operation we are currently executing with this peer.
227 * GNUNET_NO otherwise.
228 */
229 int inventory_synced;
230
231 /**
232 * Round this peer seems to be in, according to the last SE we got.
233 * Necessary to store this, as we sometimes need to respond to a request from an
234 * older round, while we are already in the next round.
235 */ 216 */
236 enum ConsensusRound apparent_round; 217 struct GNUNET_SET_OperationHandle *set_op;
237 218
238 /** 219 /**
239 * Set operation we are currently executing with this peer. 220 * Has conclude been called on the set_op?
240 */ 221 */
241 struct GNUNET_SET_OperationHandle *set_op; 222 int set_op_concluded;
242}; 223};
243 224
244 225
@@ -268,9 +249,8 @@ static struct GNUNET_SERVER_Handle *srv;
268static struct GNUNET_PeerIdentity my_peer; 249static struct GNUNET_PeerIdentity my_peer;
269 250
270 251
271/*
272static int 252static int
273exp_subround_finished (const struct ConsensusSession *session) 253have_exp_subround_finished (const struct ConsensusSession *session)
274{ 254{
275 int not_finished; 255 int not_finished;
276 not_finished = 0; 256 not_finished = 0;
@@ -284,25 +264,9 @@ exp_subround_finished (const struct ConsensusSession *session)
284 return GNUNET_YES; 264 return GNUNET_YES;
285 return GNUNET_NO; 265 return GNUNET_NO;
286} 266}
287*/
288 267
289 268
290 269
291/*
292static int
293inventory_round_finished (struct ConsensusSession *session)
294{
295 int i;
296 int finished;
297 finished = 0;
298 for (i = 0; i < session->num_peers; i++)
299 if (GNUNET_YES == session->info[i].inventory_synced)
300 finished++;
301 if (finished >= (session->num_peers / 2))
302 return GNUNET_YES;
303 return GNUNET_NO;
304}
305*/
306 270
307/** 271/**
308 * Destroy a session, free all resources associated with it. 272 * Destroy a session, free all resources associated with it.
@@ -341,39 +305,6 @@ destroy_session (struct ConsensusSession *session)
341 305
342 306
343/** 307/**
344 * Start the inventory round, contact all peers we are supposed to contact.
345 *
346 * @param session the current session
347 */
348static void
349start_inventory (struct ConsensusSession *session)
350{
351 int i;
352 int last;
353
354 for (i = 0; i < session->num_peers; i++)
355 session->info[i].is_outgoing = GNUNET_NO;
356
357 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
358 i = (session->local_peer_idx + 1) % session->num_peers;
359 while (i != last)
360 {
361 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
362 session->info[i].is_outgoing = GNUNET_YES;
363 // embrace_peer (&session->info[i]);
364 i = (i + 1) % session->num_peers;
365 }
366 // tie-breaker for even number of peers
367 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
368 {
369 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
370 session->info[last].is_outgoing = GNUNET_YES;
371 // embrace_peer (&session->info[last]);
372 }
373}
374
375
376/**
377 * Start the next round. 308 * Start the next round.
378 * This function can be invoked as a timeout task, or called manually (tc will be NULL then). 309 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
379 * 310 *
@@ -407,27 +338,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
407 subround_over (session, NULL); 338 subround_over (session, NULL);
408 break; 339 break;
409 case CONSENSUS_ROUND_EXCHANGE: 340 case CONSENSUS_ROUND_EXCHANGE:
410 /* handle two peers specially */ 341 /* FIXME: send all elements to client */
411 if (session->num_peers <= 2)
412 {
413 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
414 //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
415 //send_client_conclude_done (session);
416 session->current_round = CONSENSUS_ROUND_FINISH;
417 return;
418 }
419 session->current_round = CONSENSUS_ROUND_INVENTORY;
420 start_inventory (session);
421 break;
422 case CONSENSUS_ROUND_INVENTORY:
423 session->current_round = CONSENSUS_ROUND_COMPLETION;
424 session->exp_round = 0;
425 subround_over (session, NULL);
426 break;
427 case CONSENSUS_ROUND_COMPLETION:
428 session->current_round = CONSENSUS_ROUND_FINISH; 342 session->current_round = CONSENSUS_ROUND_FINISH;
429 //send_client_conclude_done (session);
430 break;
431 default: 343 default:
432 GNUNET_assert (0); 344 GNUNET_assert (0);
433 } 345 }
@@ -435,68 +347,91 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
435 347
436 348
437/** 349/**
438 * Adapt the shuffle of the session for the current round. 350 * Create a new permutation for the session's peers in session->shuffle.
351 * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
352 * both the global session id and the current round index.
353 *
354 * @param session the session to create the new permutation for
439 */ 355 */
440static void 356static void
441shuffle (struct ConsensusSession *session) 357shuffle (struct ConsensusSession *session)
442{ 358{
443 /* adapted from random_permute in util/crypto_random.c */ 359 uint32_t i;
444 /* FIXME 360 uint32_t randomness[session->num_peers-1];
445 unsigned int *ret; 361
446 unsigned int i; 362 if (NULL == session->shuffle)
447 unsigned int tmp; 363 session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
448 uint32_t x; 364
365 GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), &session->exp_round, sizeof (uint32_t),
366 &session->global_id, sizeof (struct GNUNET_HashCode));
367
368 for (i = 0; i < session->num_peers; i++)
369 session->shuffle[i] = i;
449 370
450 GNUNET_assert (n > 0); 371 for (i = session->num_peers - 1; i > 0; i--)
451 ret = GNUNET_malloc (n * sizeof (unsigned int));
452 for (i = 0; i < n; i++)
453 ret[i] = i;
454 for (i = n - 1; i > 0; i--)
455 { 372 {
456 x = GNUNET_CRYPTO_random_u32 (mode, i + 1); 373 uint32_t x;
457 tmp = ret[x]; 374 uint32_t tmp;
458 ret[x] = ret[i]; 375 x = randomness[i-1];
459 ret[i] = tmp; 376 tmp = session->shuffle[x];
377 session->shuffle[x] = session->shuffle[i];
378 session->shuffle[i] = tmp;
460 } 379 }
461 */
462} 380}
463 381
464 382
465/** 383/**
466 * Find and set the partner_incoming and partner_outgoing of our peer, 384 * Find and set the partner_incoming and partner_outgoing of our peer,
467 * one of them may not exist in most cases. 385 * one of them may not exist (and thus set to NULL) if the number of peers
386 * in the session is not a power of two.
468 * 387 *
469 * @param session the consensus session 388 * @param session the consensus session
470 */ 389 */
471static void 390static void
472find_partners (struct ConsensusSession *session) 391find_partners (struct ConsensusSession *session)
473{ 392{
474 int mark[session->num_peers]; 393 int arc;
475 int i; 394 int partner_idx;
476 395 int largest_arc;
477 memset (mark, 0, session->num_peers * sizeof (int)); 396 int num_ghosts;
478 session->partner_incoming = session->partner_outgoing = NULL; 397
479 for (i = 0; i < session->num_peers; i++) 398 /* distance to neighboring peer in current subround */
399 arc = 1 << session->exp_subround;
400 partner_idx = (session->local_peer_idx + arc) % session->num_peers;
401 largest_arc = 1;
402 while (largest_arc < session->num_peers)
403 largest_arc <<= 1;
404 num_ghosts = largest_arc - session->num_peers;
405
406 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts);
407
408 if (0 == (session->local_peer_idx & arc))
480 { 409 {
481 int arc; 410 /* we are outgoing */
482 if (0 != mark[i]) 411 session->partner_outgoing = &session->info[session->shuffle[partner_idx]];
483 continue; 412 /* are we a 'ghost' of a peer that would exist if
484 arc = (i + (1 << session->exp_subround)) % session->num_peers; 413 * the number of peers was a power of two, and thus have to partner
485 mark[i] = mark[arc] = 1; 414 * with an additional peer?
486 GNUNET_assert (i != arc); 415 */
487 if (i == session->local_peer_idx) 416 if (session->local_peer_idx < num_ghosts)
488 { 417 {
489 GNUNET_assert (NULL == session->partner_outgoing); 418 int ghost_partner_idx;
490 session->partner_outgoing = &session->info[session->shuffle[arc]]; 419 ghost_partner_idx = (session->local_peer_idx - arc) % session->num_peers;
491 session->partner_outgoing->exp_subround_finished = GNUNET_NO; 420 /* platform dependent; modulo sometimes returns negative values */
421 if (ghost_partner_idx < 0)
422 ghost_partner_idx += arc;
423 session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]];
492 } 424 }
493 if (arc == session->local_peer_idx) 425 else
494 { 426 {
495 GNUNET_assert (NULL == session->partner_incoming); 427 session->partner_incoming = NULL;
496 session->partner_incoming = &session->info[session->shuffle[i]];
497 session->partner_incoming->exp_subround_finished = GNUNET_NO;
498 } 428 }
499 } 429 }
430 else
431 {
432 session->partner_outgoing = NULL;
433 session->partner_incoming = &session->info[session->shuffle[partner_idx]];
434 }
500} 435}
501 436
502 437
@@ -508,11 +443,42 @@ find_partners (struct ConsensusSession *session)
508 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK 443 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
509 * @param status see enum GNUNET_SET_Status 444 * @param status see enum GNUNET_SET_Status
510 */ 445 */
511static void set_result_cb (void *cls, 446static void
512 const struct GNUNET_SET_Element *element, 447set_result_cb (void *cls,
513 enum GNUNET_SET_Status status) 448 const struct GNUNET_SET_Element *element,
449 enum GNUNET_SET_Status status)
514{ 450{
515 /* FIXME */ 451 struct ConsensusPeerInformation *cpi = cls;
452
453 switch (status)
454 {
455 case GNUNET_SET_STATUS_OK:
456 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n");
457 break;
458 case GNUNET_SET_STATUS_FAILURE:
459 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n");
460 break;
461 case GNUNET_SET_STATUS_HALF_DONE:
462 case GNUNET_SET_STATUS_DONE:
463 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n");
464 cpi->exp_subround_finished = GNUNET_YES;
465 if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
466 subround_over (cpi->session, NULL);
467 return;
468 default:
469 GNUNET_break (0);
470 return;
471 }
472
473 switch (cpi->session->current_round)
474 {
475 case CONSENSUS_ROUND_EXCHANGE:
476 GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
477 break;
478 default:
479 GNUNET_break (0);
480 return;
481 }
516} 482}
517 483
518 484
@@ -540,14 +506,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
540 GNUNET_SCHEDULER_cancel (session->round_timeout_tid); 506 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
541 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; 507 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
542 } 508 }
543 /* check if we are done with the log phase, 2-peer consensus only does one log round */
544 if ( (session->exp_round == NUM_EXP_ROUNDS) ||
545 ((session->num_peers == 2) && (session->exp_round == 1)))
546 {
547 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
548 round_over (session, NULL);
549 return;
550 }
551 if (session->exp_round == 0) 509 if (session->exp_round == 0)
552 { 510 {
553 /* initialize everything for the log-rounds */ 511 /* initialize everything for the log-rounds */
@@ -575,18 +533,27 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
575 533
576 if (NULL != session->partner_outgoing) 534 if (NULL != session->partner_outgoing)
577 { 535 {
536 struct GNUNET_CONSENSUS_RoundContextMessage *msg;
537 msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
538 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
539 msg->header.size = htons (sizeof *msg);
540 msg->round = htonl (session->current_round);
541 msg->exp_round = htonl (session->exp_round);
542 msg->exp_subround = htonl (session->exp_subround);
543
578 if (NULL != session->partner_outgoing->set_op) 544 if (NULL != session->partner_outgoing->set_op)
545 {
579 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); 546 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
547 }
580 session->partner_outgoing->set_op = 548 session->partner_outgoing->set_op =
581 GNUNET_SET_evaluate (session->element_set, 549 GNUNET_SET_evaluate (&session->partner_outgoing->peer_id,
582 &session->partner_outgoing->peer_id,
583 &session->global_id, 550 &session->global_id,
584 NULL, /* FIXME */ 551 (struct GNUNET_MessageHeader *) msg,
585 0, /* FIXME */ 552 0, /* FIXME */
586 GNUNET_SET_RESULT_ADDED, 553 GNUNET_SET_RESULT_ADDED,
587 set_result_cb, session); 554 set_result_cb, session->partner_outgoing);
588 555 GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set);
589 556 session->partner_outgoing->set_op_concluded = GNUNET_YES;
590 } 557 }
591 558
592#ifdef GNUNET_EXTRA_LOGGING 559#ifdef GNUNET_EXTRA_LOGGING
@@ -642,6 +609,8 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod
642 int i; 609 int i;
643 struct GNUNET_HashCode tmp; 610 struct GNUNET_HashCode tmp;
644 611
612 /* FIXME: use kdf? */
613
645 session->global_id = *session_id; 614 session->global_id = *session_id;
646 for (i = 0; i < session->num_peers; ++i) 615 for (i = 0; i < session->num_peers; ++i)
647 { 616 {
@@ -727,9 +696,6 @@ initialize_session_peer_list (struct ConsensusSession *session,
727} 696}
728 697
729 698
730
731
732
733/** 699/**
734 * Called when another peer wants to do a set operation with the 700 * Called when another peer wants to do a set operation with the
735 * local peer. 701 * local peer.
@@ -750,7 +716,67 @@ set_listen_cb (void *cls,
750 const struct GNUNET_MessageHeader *context_msg, 716 const struct GNUNET_MessageHeader *context_msg,
751 struct GNUNET_SET_Request *request) 717 struct GNUNET_SET_Request *request)
752{ 718{
753 /* FIXME */ 719 struct ConsensusSession *session = cls;
720 struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
721 struct ConsensusPeerInformation *cpi;
722 int index;
723
724 if (NULL == context_msg)
725 {
726 GNUNET_break_op (0);
727 return;
728 }
729
730 index = get_peer_idx (other_peer, session);
731
732 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s (&other_peer->hashPubKey));
733
734 if (index < 0)
735 {
736 GNUNET_break_op (0);
737 return;
738 }
739
740 cpi = &session->info[index];
741
742 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", session->local_peer_idx, index);
743
744 switch (session->current_round)
745 {
746 case CONSENSUS_ROUND_EXCHANGE:
747 if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE)
748 {
749 GNUNET_break_op (0);
750 return;
751 }
752 if (ntohl (msg->exp_round) < session->exp_round)
753 {
754 GNUNET_break_op (0);
755 return;
756 }
757 if (ntohl (msg->exp_subround) < session->exp_subround)
758 {
759 GNUNET_break_op (0);
760 return;
761 }
762 if (NULL != cpi->set_op)
763 GNUNET_SET_operation_cancel (cpi->set_op);
764 cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
765 set_result_cb, &session->info[index]);
766 if (ntohl (msg->exp_subround) == session->exp_subround)
767 {
768 cpi->set_op_concluded = GNUNET_YES;
769 GNUNET_SET_conclude (cpi->set_op, session->element_set);
770 }
771 else
772 {
773 cpi->set_op_concluded = GNUNET_NO;
774 }
775 break;
776 default:
777 GNUNET_break_op (0);
778 return;
779 }
754} 780}
755 781
756 782
@@ -769,7 +795,9 @@ initialize_session (struct ConsensusSession *session,
769 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); 795 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
770 compute_global_id (session, &join_msg->session_id); 796 compute_global_id (session, &join_msg->session_id);
771 797
772 /* check if some local client already owns the session. */ 798 /* check if some local client already owns the session.
799 * it is only legal to have a session with an existing global id
800 * if all other sessions with this global id are finished.*/
773 other_session = sessions_head; 801 other_session = sessions_head;
774 while (NULL != other_session) 802 while (NULL != other_session)
775 { 803 {
@@ -789,6 +817,8 @@ initialize_session (struct ConsensusSession *session,
789 817
790 session->local_peer_idx = get_peer_idx (&my_peer, session); 818 session->local_peer_idx = get_peer_idx (&my_peer, session);
791 GNUNET_assert (-1 != session->local_peer_idx); 819 GNUNET_assert (-1 != session->local_peer_idx);
820 session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
821 GNUNET_assert (NULL != session->element_set);
792 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, 822 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
793 &session->global_id, 823 &session->global_id,
794 set_listen_cb, session); 824 set_listen_cb, session);
@@ -827,6 +857,8 @@ client_join (void *cls,
827{ 857{
828 struct ConsensusSession *session; 858 struct ConsensusSession *session;
829 859
860 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n");
861
830 session = get_session_by_client (client); 862 session = get_session_by_client (client);
831 if (NULL != session) 863 if (NULL != session)
832 { 864 {
@@ -835,9 +867,13 @@ client_join (void *cls,
835 return; 867 return;
836 } 868 }
837 session = GNUNET_new (struct ConsensusSession); 869 session = GNUNET_new (struct ConsensusSession);
870 session->client = client;
838 GNUNET_SERVER_client_keep (client); 871 GNUNET_SERVER_client_keep (client);
839 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); 872 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
840 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); 873 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
874 GNUNET_SERVER_receive_done (client, GNUNET_OK);
875
876 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n");
841} 877}
842 878
843 879
@@ -858,12 +894,7 @@ client_insert (void *cls,
858 struct GNUNET_SET_Element *element; 894 struct GNUNET_SET_Element *element;
859 ssize_t element_size; 895 ssize_t element_size;
860 896
861 session = sessions_head; 897 session = get_session_by_client (client);
862 while (NULL != session)
863 {
864 if (session->client == client)
865 break;
866 }
867 898
868 if (NULL == session) 899 if (NULL == session)
869 { 900 {
@@ -886,6 +917,7 @@ client_insert (void *cls,
886 GNUNET_break (0); 917 GNUNET_break (0);
887 return; 918 return;
888 } 919 }
920
889 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); 921 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
890 element->type = msg->element_type; 922 element->type = msg->element_type;
891 element->size = element_size; 923 element->size = element_size;
@@ -893,6 +925,8 @@ client_insert (void *cls,
893 element->data = &element[1]; 925 element->data = &element[1];
894 GNUNET_SET_add_element (session->element_set, element, NULL, NULL); 926 GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
895 GNUNET_SERVER_receive_done (client, GNUNET_OK); 927 GNUNET_SERVER_receive_done (client, GNUNET_OK);
928
929 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx);
896} 930}
897 931
898 932
@@ -911,10 +945,10 @@ client_conclude (void *cls,
911 struct ConsensusSession *session; 945 struct ConsensusSession *session;
912 struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; 946 struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
913 947
914 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
915 948
949 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
950 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
916 session = get_session_by_client (client); 951 session = get_session_by_client (client);
917
918 if (NULL == session) 952 if (NULL == session)
919 { 953 {
920 /* client not found */ 954 /* client not found */
@@ -922,14 +956,12 @@ client_conclude (void *cls,
922 GNUNET_SERVER_client_disconnect (client); 956 GNUNET_SERVER_client_disconnect (client);
923 return; 957 return;
924 } 958 }
925
926 if (CONSENSUS_ROUND_BEGIN != session->current_round) 959 if (CONSENSUS_ROUND_BEGIN != session->current_round)
927 { 960 {
928 /* client requested conclude twice */ 961 /* client requested conclude twice */
929 GNUNET_break (0); 962 GNUNET_break (0);
930 return; 963 return;
931 } 964 }
932
933 if (session->num_peers <= 1) 965 if (session->num_peers <= 1)
934 { 966 {
935 //send_client_conclude_done (session); 967 //send_client_conclude_done (session);
@@ -937,7 +969,7 @@ client_conclude (void *cls,
937 else 969 else
938 { 970 {
939 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); 971 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
940 /* the 'begin' round is over, start with the next, real round */ 972 /* the 'begin' round is over, start with the next, actual round */
941 round_over (session, NULL); 973 round_over (session, NULL);
942 } 974 }
943 975
@@ -964,6 +996,29 @@ shutdown_task (void *cls,
964 996
965 997
966/** 998/**
999 * Clean up after a client after it is
1000 * disconnected (either by us or by itself)
1001 *
1002 * @param cls closure, unused
1003 * @param client the client to clean up after
1004 */
1005void
1006handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1007{
1008 struct ConsensusSession *session;
1009
1010 session = get_session_by_client (client);
1011 if (NULL == session)
1012 return;
1013 if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1014 (CONSENSUS_ROUND_FINISH == session->current_round))
1015 destroy_session (session);
1016 else
1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1018}
1019
1020
1021/**
967 * Start processing consensus requests. 1022 * Start processing consensus requests.
968 * 1023 *
969 * @param cls closure 1024 * @param cls closure
@@ -971,13 +1026,14 @@ shutdown_task (void *cls,
971 * @param c configuration to use 1026 * @param c configuration to use
972 */ 1027 */
973static void 1028static void
974run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) 1029run (void *cls, struct GNUNET_SERVER_Handle *server,
1030 const struct GNUNET_CONFIGURATION_Handle *c)
975{ 1031{
976 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 1032 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
977 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
978 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
979 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, 1033 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
980 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, 1034 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1035 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1036 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
981 {NULL, NULL, 0, 0} 1037 {NULL, NULL, 0, 0}
982 }; 1038 };
983 1039
@@ -992,6 +1048,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
992 } 1048 }
993 GNUNET_SERVER_add_handlers (server, server_handlers); 1049 GNUNET_SERVER_add_handlers (server, server_handlers);
994 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); 1050 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1051 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
995 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); 1052 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
996} 1053}
997 1054
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 437636c99..37facf84e 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -5,7 +5,7 @@ HOSTNAME = localhost
5HOME = $SERVICEHOME 5HOME = $SERVICEHOME
6BINARY = gnunet-service-consensus 6BINARY = gnunet-service-consensus
7#PREFIX = gdbserver :12345 7#PREFIX = gdbserver :12345
8PREFIX = valgrind --leak-check=full 8PREFIX = valgrind
9ACCEPT_FROM = 127.0.0.1; 9ACCEPT_FROM = 127.0.0.1;
10ACCEPT_FROM6 = ::1; 10ACCEPT_FROM6 = ::1;
11UNIXPATH = /tmp/gnunet-service-consensus.sock 11UNIXPATH = /tmp/gnunet-service-consensus.sock
@@ -19,7 +19,11 @@ OPTIONS = -LERROR
19 19
20 20
21[arm] 21[arm]
22DEFAULTSERVICES = core consensus 22DEFAULTSERVICES = core consensus set
23
24[set]
25OPTIONS = -L INFO
26PREFIX = valgrind
23 27
24 28
25[testbed] 29[testbed]
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index 3b0f0783a..c6ba8eec3 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -1404,7 +1404,6 @@ listen_set_union (void *cls,
1404 neighbor->my_set = GNUNET_SET_create (cfg, 1404 neighbor->my_set = GNUNET_SET_create (cfg,
1405 GNUNET_SET_OPERATION_UNION); 1405 GNUNET_SET_OPERATION_UNION);
1406 neighbor->set_op = GNUNET_SET_accept (request, 1406 neighbor->set_op = GNUNET_SET_accept (request,
1407 neighbor->my_set /* FIXME: pass later! */,
1408 GNUNET_SET_RESULT_ADDED, 1407 GNUNET_SET_RESULT_ADDED,
1409 &handle_set_union_result, 1408 &handle_set_union_result,
1410 neighbor); 1409 neighbor);
@@ -1428,8 +1427,7 @@ initiate_set_union (void *cls,
1428 neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK; 1427 neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
1429 neighbor->my_set = GNUNET_SET_create (cfg, 1428 neighbor->my_set = GNUNET_SET_create (cfg,
1430 GNUNET_SET_OPERATION_UNION); 1429 GNUNET_SET_OPERATION_UNION);
1431 neighbor->set_op = GNUNET_SET_evaluate (neighbor->my_set /* FIXME: pass later! */, 1430 neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer,
1432 &neighbor->peer,
1433 &neighbor->real_session_id, 1431 &neighbor->real_session_id,
1434 NULL, 1432 NULL,
1435 0 /* FIXME: salt */, 1433 0 /* FIXME: salt */,
diff --git a/src/include/gnunet_consensus_service.h b/src/include/gnunet_consensus_service.h
index 66d48e0e2..db7509976 100644
--- a/src/include/gnunet_consensus_service.h
+++ b/src/include/gnunet_consensus_service.h
@@ -39,28 +39,7 @@ extern "C"
39#include "gnunet_common.h" 39#include "gnunet_common.h"
40#include "gnunet_time_lib.h" 40#include "gnunet_time_lib.h"
41#include "gnunet_configuration_lib.h" 41#include "gnunet_configuration_lib.h"
42 42#include "gnunet_set_service.h"
43
44/**
45 * An element of the consensus set.
46 */
47struct GNUNET_CONSENSUS_Element
48{
49 /**
50 * The actual data of the element.
51 */
52 const void *data;
53
54 /**
55 * Size of the element's data.
56 */
57 uint16_t size;
58
59 /**
60 * Application specific element type
61 */
62 uint16_t type;
63};
64 43
65 44
66/** 45/**
@@ -73,7 +52,7 @@ struct GNUNET_CONSENSUS_Element
73 * @param element new element, NULL on error 52 * @param element new element, NULL on error
74 */ 53 */
75typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls, 54typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls,
76 const struct GNUNET_CONSENSUS_Element *element); 55 const struct GNUNET_SET_Element *element);
77 56
78 57
79 58
@@ -138,7 +117,7 @@ typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls,
138 */ 117 */
139void 118void
140GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, 119GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
141 const struct GNUNET_CONSENSUS_Element *element, 120 const struct GNUNET_SET_Element *element,
142 GNUNET_CONSENSUS_InsertDoneCallback idc, 121 GNUNET_CONSENSUS_InsertDoneCallback idc,
143 void *idc_cls); 122 void *idc_cls);
144 123
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 59b692cf0..54ea806a5 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -53,36 +53,6 @@
53 */ 53 */
54#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) 54#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
55 55
56/**
57 * Append data to the end of an existing MQ message.
58 * If the operation is successful, mqm is changed to point to the new MQ message,
59 * and GNUNET_OK is returned.
60 * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed,
61 * the user of this API must take care of disposing the already allocated message
62 * (either by sending it, or by using GNUNET_MQ_discard)
63 *
64 * @param mqm MQ message to augment with additional data
65 * @param src source buffer for the additional data
66 * @param len length of the additional data
67 * @return GNUNET_SYSERR if nesting the message failed,
68 * GNUNET_OK on success
69 */
70#define GNUNET_MQ_nest(mqm, src, len) GNUNET_MQ_nest_ (&mqm, src, len)
71
72
73/**
74 * Append a message to the end of an existing MQ message.
75 * If the operation is successful, mqm is changed to point to the new MQ message,
76 * and GNUNET_OK is returned.
77 * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed,
78 * the user of this API must take care of disposing the already allocated message
79 * (either by sending it, or by using GNUNET_MQ_discard)
80 *
81 * @param mqm MQ message to augment with additional data
82 * @param mh the message to append, must be of type 'struct GNUNET_MessageHeader *'
83 */
84#define GNUNET_MQ_nest_mh(mqm, mh) ((NULL == mh) ? (GNUNET_OK) : GNUNET_MQ_nest((mqm), (mh), ntohs ((mh)->size)))
85
86 56
87/** 57/**
88 * Allocate a GNUNET_MQ_Message, where the message only consists of a header. 58 * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
@@ -105,6 +75,40 @@
105 75
106 76
107/** 77/**
78 * Allocate a GNUNET_MQ_Message, and append a payload message after the given
79 * message struct.
80 *
81 * @param mvar pointer to a message struct, will be changed to point at the newly allocated message,
82 * whose size is 'sizeof(*mvar) + ntohs (mh->size)'
83 * @param type message type of the allocated message, has no effect on the nested message
84 * @param mh message to nest
85 * @return a newly allocated 'struct GNUNET_MQ_Message *'
86 */
87#define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh)
88
89
90/**
91 * Return a pointer to the message at the end of the given message.
92 *
93 * @param var pointer to a message struct, the type of the expression determines the base size,
94 * the space after the base size is the nested message
95 * @return a 'struct GNUNET_MessageHeader *' that points at the nested message of the given message,
96 * or NULL if the given message in 'var' does not have any space after the message struct
97 */
98#define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var)))
99
100
101struct GNUNET_MessageHeader *
102GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size);
103
104
105struct GNUNET_MQ_Message *
106GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
107 const struct GNUNET_MessageHeader *nested_mh);
108
109
110
111/**
108 * End-marker for the handlers array 112 * End-marker for the handlers array
109 */ 113 */
110#define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} 114#define GNUNET_MQ_HANDLERS_END {NULL, 0, 0}
@@ -128,7 +132,8 @@ enum GNUNET_MQ_Error
128 * @param cls closure 132 * @param cls closure
129 * @param msg the received message 133 * @param msg the received message
130 */ 134 */
131typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); 135typedef void
136(*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg);
132 137
133 138
134/** 139/**
@@ -151,10 +156,12 @@ typedef void
151 * 156 *
152 * @param cls closure 157 * @param cls closure
153 */ 158 */
154typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); 159typedef void
160(*GNUNET_MQ_NotifyCallback) (void *cls);
155 161
156 162
157typedef void (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); 163typedef void
164(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
158 165
159 166
160struct GNUNET_MQ_Message 167struct GNUNET_MQ_Message
@@ -287,6 +294,7 @@ struct GNUNET_MQ_Handler
287}; 294};
288 295
289 296
297
290/** 298/**
291 * Create a new message for MQ. 299 * Create a new message for MQ.
292 * 300 *
@@ -300,21 +308,6 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
300 308
301 309
302/** 310/**
303 * Resize the the mq message pointed to by mqmp,
304 * and append the given data to it.
305 *
306 * @param mqmp pointer to a mq message pointer
307 * @param src source of the data to append
308 * @param len length of the data to append
309 * @return GNUNET_OK on success,
310 * GNUNET_SYSERR on error (e.g. if len is too large)
311 */
312int
313GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
314 const void *src, uint16_t len);
315
316
317/**
318 * Discard the message queue message, free all 311 * Discard the message queue message, free all
319 * allocated resources. Must be called in the event 312 * allocated resources. Must be called in the event
320 * that a message is created but should not actually be sent. 313 * that a message is created but should not actually be sent.
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 15bbae2e8..85c643f7d 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1754,11 +1754,18 @@ extern "C"
1754 */ 1754 */
1755#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548 1755#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548
1756 1756
1757/**
1758 * Abort a round, don't send requested elements anymore
1759 */
1760#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT 547
1761
1757 1762
1758/******************************************************************************* 1763/*******************************************************************************
1759 * SET message types 1764 * SET message types
1760 ******************************************************************************/ 1765 ******************************************************************************/
1761 1766
1767#define GNUNET_MESSAGE_TYPE_SET_REJECT 569
1768
1762/** 1769/**
1763 * Cancel a set operation 1770 * Cancel a set operation
1764 */ 1771 */
@@ -1800,44 +1807,49 @@ extern "C"
1800#define GNUNET_MESSAGE_TYPE_SET_EVALUATE 577 1807#define GNUNET_MESSAGE_TYPE_SET_EVALUATE 577
1801 1808
1802/** 1809/**
1803 * Evaluate a set operation 1810 * Start a set operation with the given set
1811 */
1812#define GNUNET_MESSAGE_TYPE_SET_CONCLUDE 578
1813
1814/**
1815 * Notify the client of a request from a remote peer
1804 */ 1816 */
1805#define GNUNET_MESSAGE_TYPE_SET_REQUEST 578 1817#define GNUNET_MESSAGE_TYPE_SET_REQUEST 579
1806 1818
1807/** 1819/**
1808 * Evaluate a set operation. 1820 * Create a new local set
1809 */ 1821 */
1810#define GNUNET_MESSAGE_TYPE_SET_CREATE 579 1822#define GNUNET_MESSAGE_TYPE_SET_CREATE 580
1811 1823
1812/** 1824/**
1813 * Evaluate a set operation. 1825 * Request a set operation from a remote peer.
1814 */ 1826 */
1815#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580 1827#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 581
1816 1828
1817/** 1829/**
1818 * Strata estimator. 1830 * Strata estimator.
1819 */ 1831 */
1820#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581 1832#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 582
1821 1833
1822/** 1834/**
1823 * Invertible bloom filter. 1835 * Invertible bloom filter.
1824 */ 1836 */
1825#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582 1837#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 583
1826 1838
1827/** 1839/**
1828 * Actual set elements. 1840 * Actual set elements.
1829 */ 1841 */
1830#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583 1842#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 584
1831 1843
1832/** 1844/**
1833 * Requests for the elements with the given hashes. 1845 * Requests for the elements with the given hashes.
1834 */ 1846 */
1835#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584 1847#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 585
1836 1848
1837/** 1849/**
1838 * Operation is done. 1850 * Operation is done.
1839 */ 1851 */
1840#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585 1852#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 586
1841 1853
1842 1854
1843 1855
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h
index ce413c0de..2684df00a 100644
--- a/src/include/gnunet_set_service.h
+++ b/src/include/gnunet_set_service.h
@@ -257,11 +257,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set);
257 257
258 258
259/** 259/**
260 * Evaluate a set operation with our set and the set of another peer. 260 * Create a set operation for evaluation with another peer.
261 * The evaluation will not start until the client provides
262 * a local set with GNUNET_SET_conclude.
261 * 263 *
262 * @param set set to use -- FIXME: remove
263 * this argument, use GNUNET_SET_conclude instead!
264 * @param salt salt for HKDF (explain more here)
265 * @param other_peer peer with the other set 264 * @param other_peer peer with the other set
266 * @param app_id hash for the application using the set 265 * @param app_id hash for the application using the set
267 * @param context_msg additional information for the request 266 * @param context_msg additional information for the request
@@ -275,8 +274,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set);
275 * @return a handle to cancel the operation 274 * @return a handle to cancel the operation
276 */ 275 */
277struct GNUNET_SET_OperationHandle * 276struct GNUNET_SET_OperationHandle *
278GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, 277GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
279 const struct GNUNET_PeerIdentity *other_peer,
280 const struct GNUNET_HashCode *app_id, 278 const struct GNUNET_HashCode *app_id,
281 const struct GNUNET_MessageHeader *context_msg, 279 const struct GNUNET_MessageHeader *context_msg,
282 uint16_t salt, 280 uint16_t salt,
@@ -315,13 +313,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh);
315 313
316 314
317/** 315/**
318 * Accept a request we got via GNUNET_SET_listen. Must be called 316 * Accept a request we got via GNUNET_SET_listen. Must be called during
319 * during GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' 317 * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
320 * becomes invalid afterwards. 318 * afterwards.
319 * Call GNUNET_SET_conclude to provide the local set to use for the operation,
320 * and to begin the exchange with the remote peer.
321 * 321 *
322 * @param request request to accept 322 * @param request request to accept
323 * @param set set used for the requested operation -- FIXME: remove
324 * this argument, use GNUNET_SET_conclude instead!
325 * @param result_mode specified how results will be returned, 323 * @param result_mode specified how results will be returned,
326 * see 'GNUNET_SET_ResultMode'. 324 * see 'GNUNET_SET_ResultMode'.
327 * @param result_cb callback for the results 325 * @param result_cb callback for the results
@@ -330,7 +328,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh);
330 */ 328 */
331struct GNUNET_SET_OperationHandle * 329struct GNUNET_SET_OperationHandle *
332GNUNET_SET_accept (struct GNUNET_SET_Request *request, 330GNUNET_SET_accept (struct GNUNET_SET_Request *request,
333 struct GNUNET_SET_Handle *set,
334 enum GNUNET_SET_ResultMode result_mode, 331 enum GNUNET_SET_ResultMode result_mode,
335 GNUNET_SET_ResultIterator result_cb, 332 GNUNET_SET_ResultIterator result_cb,
336 void *cls); 333 void *cls);
@@ -353,9 +350,9 @@ GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
353 350
354 351
355/** 352/**
356 * Cancel the given set operation. FIXME: do clients have 353 * Cancel the given set operation.
357 * to cancel the operatino if the GNUNET_SET_ResultIterator 354 * May not be called after the operation's GNUNET_SET_ResultIterator has been
358 * has been called with timeout/error/done? 355 * called with a status that indicates error, timeout or done.
359 * 356 *
360 * @param oh set operation to cancel 357 * @param oh set operation to cancel
361 */ 358 */
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 2aea50365..4da718879 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -226,6 +226,23 @@ destroy_incoming (struct Incoming *incoming)
226 GNUNET_free (incoming); 226 GNUNET_free (incoming);
227} 227}
228 228
229static struct Listener *
230get_listener_by_target (enum GNUNET_SET_OperationType op,
231 const struct GNUNET_HashCode *app_id)
232{
233 struct Listener *l;
234
235 for (l = listeners_head; NULL != l; l = l->next)
236 {
237 if (l->operation != op)
238 continue;
239 if (0 != GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id))
240 continue;
241 return l;
242 }
243 return NULL;
244}
245
229 246
230/** 247/**
231 * Handle a request for a set operation from 248 * Handle a request for a set operation from
@@ -240,62 +257,33 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
240 struct Incoming *incoming = cls; 257 struct Incoming *incoming = cls;
241 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; 258 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
242 struct GNUNET_MQ_Message *mqm; 259 struct GNUNET_MQ_Message *mqm;
243 struct RequestMessage *cmsg; 260 struct GNUNET_SET_RequestMessage *cmsg;
244 struct Listener *listener; 261 struct Listener *listener;
245 const struct GNUNET_MessageHeader *context_msg; 262 const struct GNUNET_MessageHeader *context_msg;
246 263
247 if (ntohs (mh->size) < sizeof *msg) 264 context_msg = GNUNET_MQ_extract_nested_mh (msg);
265 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
266 ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
267 listener = get_listener_by_target (ntohs (msg->operation), &msg->app_id);
268 if (NULL == listener)
248 { 269 {
249 /* message is to small for its type */ 270 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
250 GNUNET_break (0); 271 "set operation request from peer failed: "
251 destroy_incoming (incoming); 272 "no set with matching application ID and operation type\n");
252 return; 273 return;
253 } 274 }
254 else if (ntohs (mh->size) == sizeof *msg) 275 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg);
255 { 276 if (NULL == mqm)
256 /* there is no context message */
257 context_msg = NULL;
258 }
259 else
260 { 277 {
261 context_msg = &msg[1].header; 278 /* FIXME: disconnect the peer */
262 if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size)) 279 GNUNET_break_op (0);
263 {
264 /* size of context message is invalid */
265 GNUNET_break (0);
266 destroy_incoming (incoming);
267 return;
268 }
269 }
270
271 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
272 ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
273
274 /* find the appropriate listener */
275 for (listener = listeners_head;
276 listener != NULL;
277 listener = listener->next)
278 {
279 if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
280 (ntohs (msg->operation) != listener->operation) )
281 continue;
282 mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
283 if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg))
284 {
285 /* FIXME: disconnect the peer */
286 GNUNET_MQ_discard (mqm);
287 GNUNET_break (0);
288 return;
289 }
290 incoming->accept_id = accept_id++;
291 cmsg->accept_id = htonl (incoming->accept_id);
292 GNUNET_MQ_send (listener->client_mq, mqm);
293 return; 280 return;
294 } 281 }
295 282 incoming->accept_id = accept_id++;
296 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 283 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id);
297 "set operation request from peer failed: " 284 cmsg->accept_id = htonl (incoming->accept_id);
298 "no set with matching application ID and operation type\n"); 285 cmsg->peer_id = incoming->peer;
286 GNUNET_MQ_send (listener->client_mq, mqm);
299} 287}
300 288
301 289
@@ -311,7 +299,7 @@ handle_client_create (void *cls,
311 struct GNUNET_SERVER_Client *client, 299 struct GNUNET_SERVER_Client *client,
312 const struct GNUNET_MessageHeader *m) 300 const struct GNUNET_MessageHeader *m)
313{ 301{
314 struct SetCreateMessage *msg = (struct SetCreateMessage *) m; 302 struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m;
315 struct Set *set; 303 struct Set *set;
316 304
317 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", 305 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n",
@@ -363,7 +351,7 @@ handle_client_listen (void *cls,
363 struct GNUNET_SERVER_Client *client, 351 struct GNUNET_SERVER_Client *client,
364 const struct GNUNET_MessageHeader *m) 352 const struct GNUNET_MessageHeader *m)
365{ 353{
366 struct ListenMessage *msg = (struct ListenMessage *) m; 354 struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m;
367 struct Listener *listener; 355 struct Listener *listener;
368 356
369 if (NULL != get_listener (client)) 357 if (NULL != get_listener (client))
@@ -410,7 +398,7 @@ handle_client_remove (void *cls,
410 switch (set->operation) 398 switch (set->operation)
411 { 399 {
412 case GNUNET_SET_OPERATION_UNION: 400 case GNUNET_SET_OPERATION_UNION:
413 _GSS_union_remove ((struct ElementMessage *) m, set); 401 _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set);
414 case GNUNET_SET_OPERATION_INTERSECTION: 402 case GNUNET_SET_OPERATION_INTERSECTION:
415 /* FIXME: cfuchs */ 403 /* FIXME: cfuchs */
416 break; 404 break;
@@ -423,6 +411,38 @@ handle_client_remove (void *cls,
423} 411}
424 412
425 413
414
415/**
416 * Called when the client wants to reject an operation
417 * request from another peer.
418 *
419 * @param cls unused
420 * @param client client that sent the message
421 * @param m message sent by the client
422 */
423static void
424handle_client_reject (void *cls,
425 struct GNUNET_SERVER_Client *client,
426 const struct GNUNET_MessageHeader *m)
427{
428 struct Incoming *incoming;
429 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) m;
430
431 GNUNET_break (0 == ntohl (msg->request_id));
432
433 incoming = get_incoming (ntohl (msg->accept_reject_id));
434 if (NULL == incoming)
435 {
436 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
437 return;
438 }
439 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
440 destroy_incoming (incoming);
441 GNUNET_SERVER_receive_done (client, GNUNET_OK);
442}
443
444
445
426/** 446/**
427 * Called when a client wants to add an element to a 447 * Called when a client wants to add an element to a
428 * set it inhabits. 448 * set it inhabits.
@@ -448,7 +468,7 @@ handle_client_add (void *cls,
448 switch (set->operation) 468 switch (set->operation)
449 { 469 {
450 case GNUNET_SET_OPERATION_UNION: 470 case GNUNET_SET_OPERATION_UNION:
451 _GSS_union_add ((struct ElementMessage *) m, set); 471 _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set);
452 case GNUNET_SET_OPERATION_INTERSECTION: 472 case GNUNET_SET_OPERATION_INTERSECTION:
453 /* FIXME: cfuchs */ 473 /* FIXME: cfuchs */
454 break; 474 break;
@@ -490,7 +510,7 @@ handle_client_evaluate (void *cls,
490 /* FIXME: cfuchs */ 510 /* FIXME: cfuchs */
491 break; 511 break;
492 case GNUNET_SET_OPERATION_UNION: 512 case GNUNET_SET_OPERATION_UNION:
493 _GSS_union_evaluate ((struct EvaluateMessage *) m, set); 513 _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set);
494 break; 514 break;
495 default: 515 default:
496 GNUNET_assert (0); 516 GNUNET_assert (0);
@@ -502,23 +522,6 @@ handle_client_evaluate (void *cls,
502 522
503 523
504/** 524/**
505 * Handle a cancel request from a client.
506 *
507 * @param cls unused
508 * @param client the client
509 * @param m the cancel message
510 */
511static void
512handle_client_cancel (void *cls,
513 struct GNUNET_SERVER_Client *client,
514 const struct GNUNET_MessageHeader *m)
515{
516 /* FIXME: implement */
517 GNUNET_SERVER_receive_done (client, GNUNET_OK);
518}
519
520
521/**
522 * Handle an ack from a client. 525 * Handle an ack from a client.
523 * 526 *
524 * @param cls unused 527 * @param cls unused
@@ -550,25 +553,20 @@ handle_client_accept (void *cls,
550{ 553{
551 struct Set *set; 554 struct Set *set;
552 struct Incoming *incoming; 555 struct Incoming *incoming;
553 struct AcceptMessage *msg = (struct AcceptMessage *) mh; 556 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh;
554 557
558 incoming = get_incoming (ntohl (msg->accept_reject_id));
555 559
556 incoming = get_incoming (ntohl (msg->accept_id)); 560 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl (msg->accept_reject_id));
557 561
558 if (NULL == incoming) 562 if (NULL == incoming)
559 { 563 {
564
560 GNUNET_break (0); 565 GNUNET_break (0);
561 GNUNET_SERVER_client_disconnect (client); 566 GNUNET_SERVER_client_disconnect (client);
562 return; 567 return;
563 } 568 }
564 569
565 if (0 == ntohl (msg->request_id))
566 {
567 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
568 destroy_incoming (incoming);
569 GNUNET_SERVER_receive_done (client, GNUNET_OK);
570 return;
571 }
572 570
573 set = get_set (client); 571 set = get_set (client);
574 572
@@ -687,14 +685,14 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
687 const struct GNUNET_CONFIGURATION_Handle *cfg) 685 const struct GNUNET_CONFIGURATION_Handle *cfg)
688{ 686{
689 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 687 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
688 {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
689 {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
690 {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
690 {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, 691 {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
692 {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
691 {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0}, 693 {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
692 {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, 694 {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0},
693 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, 695 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
694 {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
695 {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
696 {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
697 {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
698 {NULL, NULL, 0, 0} 696 {NULL, NULL, 0, 0}
699 }; 697 };
700 698
@@ -705,6 +703,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
705 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, 703 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
706 &stream_listen_cb, NULL, 704 &stream_listen_cb, NULL,
707 GNUNET_STREAM_OPTION_END); 705 GNUNET_STREAM_OPTION_END);
706
707 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n");
708} 708}
709 709
710 710
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index bea77416e..15199eba4 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -217,7 +217,7 @@ _GSS_union_set_create (void);
217 * @parem set the set to evaluate the operation with 217 * @parem set the set to evaluate the operation with
218 */ 218 */
219void 219void
220_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set); 220_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set);
221 221
222 222
223/** 223/**
@@ -227,7 +227,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set);
227 * @param set set to add the element to 227 * @param set set to add the element to
228 */ 228 */
229void 229void
230_GSS_union_add (struct ElementMessage *m, struct Set *set); 230_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set);
231 231
232 232
233/** 233/**
@@ -238,7 +238,7 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set);
238 * @param set set to remove the element from 238 * @param set set to remove the element from
239 */ 239 */
240void 240void
241_GSS_union_remove (struct ElementMessage *m, struct Set *set); 241_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set);
242 242
243 243
244/** 244/**
@@ -258,7 +258,7 @@ _GSS_union_set_destroy (struct Set *set);
258 * @param incoming information about the requesting remote peer 258 * @param incoming information about the requesting remote peer
259 */ 259 */
260void 260void
261_GSS_union_accept (struct AcceptMessage *m, struct Set *set, 261_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
262 struct Incoming *incoming); 262 struct Incoming *incoming);
263 263
264 264
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index c651a0381..6d9658ee5 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -245,8 +245,7 @@ struct ElementEntry
245 245
246 246
247/** 247/**
248 * Information about the element used for 248 * Entries in the key-to-element map of the union set.
249 * a specific union operation.
250 */ 249 */
251struct KeyEntry 250struct KeyEntry
252{ 251{
@@ -401,11 +400,14 @@ destroy_key_to_element_iter (void *cls,
401static void 400static void
402destroy_union_operation (struct UnionEvaluateOperation *eo) 401destroy_union_operation (struct UnionEvaluateOperation *eo)
403{ 402{
403 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
404
404 if (NULL != eo->mq) 405 if (NULL != eo->mq)
405 { 406 {
406 GNUNET_MQ_destroy (eo->mq); 407 GNUNET_MQ_destroy (eo->mq);
407 eo->mq = NULL; 408 eo->mq = NULL;
408 } 409 }
410
409 if (NULL != eo->socket) 411 if (NULL != eo->socket)
410 { 412 {
411 GNUNET_STREAM_close (eo->socket); 413 GNUNET_STREAM_close (eo->socket);
@@ -433,12 +435,16 @@ destroy_union_operation (struct UnionEvaluateOperation *eo)
433 eo->key_to_element = NULL; 435 eo->key_to_element = NULL;
434 } 436 }
435 437
436
437 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, 438 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
438 eo->set->state.u->ops_tail, 439 eo->set->state.u->ops_tail,
439 eo); 440 eo);
440 GNUNET_free (eo); 441 GNUNET_free (eo);
441 /* FIXME: free and destroy everything else */ 442
443
444 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
445
446
447 /* FIXME: do a garbage collection of the set generations */
442} 448}
443 449
444 450
@@ -452,7 +458,7 @@ static void
452fail_union_operation (struct UnionEvaluateOperation *eo) 458fail_union_operation (struct UnionEvaluateOperation *eo)
453{ 459{
454 struct GNUNET_MQ_Message *mqm; 460 struct GNUNET_MQ_Message *mqm;
455 struct ResultMessage *msg; 461 struct GNUNET_SET_ResultMessage *msg;
456 462
457 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 463 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
458 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 464 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
@@ -495,20 +501,25 @@ send_operation_request (struct UnionEvaluateOperation *eo)
495 struct GNUNET_MQ_Message *mqm; 501 struct GNUNET_MQ_Message *mqm;
496 struct OperationRequestMessage *msg; 502 struct OperationRequestMessage *msg;
497 503
498 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); 504 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
499 if (NULL != eo->context_msg) 505
500 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) 506 if (NULL == mqm)
501 { 507 {
502 /* the context message is too large */ 508 /* the context message is too large */
503 GNUNET_break (0); 509 GNUNET_break (0);
504 GNUNET_SERVER_client_disconnect (eo->set->client); 510 GNUNET_SERVER_client_disconnect (eo->set->client);
505 GNUNET_MQ_discard (mqm); 511 return;
506 return; 512 }
507 }
508 msg->operation = htons (GNUNET_SET_OPERATION_UNION); 513 msg->operation = htons (GNUNET_SET_OPERATION_UNION);
509 msg->app_id = eo->app_id; 514 msg->app_id = eo->app_id;
510 GNUNET_MQ_send (eo->mq, mqm); 515 GNUNET_MQ_send (eo->mq, mqm);
511 516
517 if (NULL != eo->context_msg)
518 {
519 GNUNET_free (eo->context_msg);
520 eo->context_msg = NULL;
521 }
522
512 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); 523 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
513} 524}
514 525
@@ -537,7 +548,7 @@ insert_element_iterator (void *cls,
537 { 548 {
538 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) 549 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
539 { 550 {
540 new_k->next_colliding = old_k; 551 new_k->next_colliding = old_k->next_colliding;
541 old_k->next_colliding = new_k; 552 old_k->next_colliding = new_k;
542 return GNUNET_NO; 553 return GNUNET_NO;
543 } 554 }
@@ -568,12 +579,11 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
568 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, 579 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
569 (uint32_t) ibf_key.key_val, 580 (uint32_t) ibf_key.key_val,
570 insert_element_iterator, k); 581 insert_element_iterator, k);
582
571 /* was the element inserted into a colliding bucket? */ 583 /* was the element inserted into a colliding bucket? */
572 if (GNUNET_SYSERR == ret) 584 if (GNUNET_SYSERR == ret)
573 {
574 GNUNET_assert (NULL != k->next_colliding);
575 return; 585 return;
576 } 586
577 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, 587 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
578 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 588 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
579} 589}
@@ -781,8 +791,8 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
781 */ 791 */
782static int 792static int
783send_element_iterator (void *cls, 793send_element_iterator (void *cls,
784 uint32_t key, 794 uint32_t key,
785 void *value) 795 void *value)
786{ 796{
787 struct SendElementClosure *sec = cls; 797 struct SendElementClosure *sec = cls;
788 struct IBF_Key ibf_key = sec->ibf_key; 798 struct IBF_Key ibf_key = sec->ibf_key;
@@ -795,15 +805,18 @@ send_element_iterator (void *cls,
795 { 805 {
796 const struct GNUNET_SET_Element *const element = &ke->element->element; 806 const struct GNUNET_SET_Element *const element = &ke->element->element;
797 struct GNUNET_MQ_Message *mqm; 807 struct GNUNET_MQ_Message *mqm;
808 struct GNUNET_MessageHeader *mh;
798 809
799 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); 810 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
800 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); 811 mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
801 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) 812 if (NULL == mqm)
802 { 813 {
814 /* element too large */
803 GNUNET_break (0); 815 GNUNET_break (0);
804 GNUNET_MQ_discard (mqm);
805 continue; 816 continue;
806 } 817 }
818 memcpy (&mh[1], element->data, element->size);
819 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
807 GNUNET_MQ_send (eo->mq, mqm); 820 GNUNET_MQ_send (eo->mq, mqm);
808 ke = ke->next_colliding; 821 ke = ke->next_colliding;
809 } 822 }
@@ -975,34 +988,42 @@ send_client_element (struct UnionEvaluateOperation *eo,
975 struct GNUNET_SET_Element *element) 988 struct GNUNET_SET_Element *element)
976{ 989{
977 struct GNUNET_MQ_Message *mqm; 990 struct GNUNET_MQ_Message *mqm;
978 struct ResultMessage *rm; 991 struct GNUNET_SET_ResultMessage *rm;
979 992
980 GNUNET_assert (0 != eo->request_id); 993 GNUNET_assert (0 != eo->request_id);
981 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 994 mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
982 rm->result_status = htons (GNUNET_SET_STATUS_OK); 995 if (NULL == mqm)
983 rm->request_id = htonl (eo->request_id);
984 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
985 { 996 {
986 GNUNET_MQ_discard (mqm); 997 GNUNET_MQ_discard (mqm);
987 GNUNET_break (0); 998 GNUNET_break (0);
988 return; 999 return;
989 } 1000 }
990 1001 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1002 rm->request_id = htonl (eo->request_id);
1003 memcpy (&rm[1], element->data, element->size);
991 GNUNET_MQ_send (eo->set->client_mq, mqm); 1004 GNUNET_MQ_send (eo->set->client_mq, mqm);
992} 1005}
993 1006
994 1007
995/** 1008/**
996 * Callback used for notifications 1009 * Completion callback for shutdown
997 * 1010 *
998 * @param cls closure 1011 * @param cls the closure from GNUNET_STREAM_shutdown call
1012 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
1013 * SHUT_RDWR)
999 */ 1014 */
1000static void 1015/*
1001client_done_sent_cb (void *cls) 1016static void
1017stream_shutdown_cb (void *cls,
1018 int operation)
1002{ 1019{
1003 //struct UnionEvaluateOperation *eo = cls; 1020 //struct UnionEvaluateOperation *eo = cls;
1004 /* FIXME: destroy eo */ 1021
1022 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
1023
1024 // destroy_union_operation (eo);
1005} 1025}
1026*/
1006 1027
1007 1028
1008/** 1029/**
@@ -1018,16 +1039,15 @@ static void
1018send_client_done_and_destroy (struct UnionEvaluateOperation *eo) 1039send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1019{ 1040{
1020 struct GNUNET_MQ_Message *mqm; 1041 struct GNUNET_MQ_Message *mqm;
1021 struct ResultMessage *rm; 1042 struct GNUNET_SET_ResultMessage *rm;
1022 1043
1023 GNUNET_assert (0 != eo->request_id); 1044 GNUNET_assert (0 != eo->request_id);
1024 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 1045 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1025 rm->request_id = htonl (eo->request_id); 1046 rm->request_id = htonl (eo->request_id);
1026 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1047 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1027 GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
1028 GNUNET_MQ_send (eo->set->client_mq, mqm); 1048 GNUNET_MQ_send (eo->set->client_mq, mqm);
1029 1049
1030 /* FIXME: destroy the eo */ 1050 // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
1031} 1051}
1032 1052
1033 1053
@@ -1199,18 +1219,25 @@ stream_open_cb (void *cls,
1199 * @parem set the set to evaluate the operation with 1219 * @parem set the set to evaluate the operation with
1200 */ 1220 */
1201void 1221void
1202_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) 1222_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
1203{ 1223{
1204 struct UnionEvaluateOperation *eo; 1224 struct UnionEvaluateOperation *eo;
1225 struct GNUNET_MessageHeader *context_msg;
1205 1226
1206 eo = GNUNET_new (struct UnionEvaluateOperation); 1227 eo = GNUNET_new (struct UnionEvaluateOperation);
1207 eo->peer = m->peer; 1228 eo->peer = m->target_peer;
1208 eo->set = set; 1229 eo->set = set;
1209 eo->request_id = htonl (m->request_id); 1230 eo->request_id = htonl (m->request_id);
1210 GNUNET_assert (0 != eo->request_id); 1231 GNUNET_assert (0 != eo->request_id);
1211 eo->se = strata_estimator_dup (set->state.u->se); 1232 eo->se = strata_estimator_dup (set->state.u->se);
1212 eo->salt = ntohs (m->salt); 1233 eo->salt = ntohs (m->salt);
1213 eo->app_id = m->app_id; 1234 eo->app_id = m->app_id;
1235
1236 context_msg = GNUNET_MQ_extract_nested_mh (m);
1237 if (NULL != context_msg)
1238 {
1239 eo->context_msg = GNUNET_copy_message (context_msg);
1240 }
1214 1241
1215 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1242 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1216 "evaluating union operation, (app %s)\n", 1243 "evaluating union operation, (app %s)\n",
@@ -1235,7 +1262,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1235 * @param incoming information about the requesting remote peer 1262 * @param incoming information about the requesting remote peer
1236 */ 1263 */
1237void 1264void
1238_GSS_union_accept (struct AcceptMessage *m, struct Set *set, 1265_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
1239 struct Incoming *incoming) 1266 struct Incoming *incoming)
1240{ 1267{
1241 struct UnionEvaluateOperation *eo; 1268 struct UnionEvaluateOperation *eo;
@@ -1250,7 +1277,6 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1250 GNUNET_assert (0 != ntohl (m->request_id)); 1277 GNUNET_assert (0 != ntohl (m->request_id));
1251 eo->request_id = ntohl (m->request_id); 1278 eo->request_id = ntohl (m->request_id);
1252 eo->se = strata_estimator_dup (set->state.u->se); 1279 eo->se = strata_estimator_dup (set->state.u->se);
1253 eo->set = set; // FIXME: redundant!?
1254 eo->mq = incoming->mq; 1280 eo->mq = incoming->mq;
1255 /* transfer ownership of mq and socket from incoming to eo */ 1281 /* transfer ownership of mq and socket from incoming to eo */
1256 incoming->mq = NULL; 1282 incoming->mq = NULL;
@@ -1299,7 +1325,7 @@ _GSS_union_set_create (void)
1299 * @param set set to add the element to 1325 * @param set set to add the element to
1300 */ 1326 */
1301void 1327void
1302_GSS_union_add (struct ElementMessage *m, struct Set *set) 1328_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1303{ 1329{
1304 struct ElementEntry *ee; 1330 struct ElementEntry *ee;
1305 struct ElementEntry *ee_dup; 1331 struct ElementEntry *ee_dup;
@@ -1357,7 +1383,9 @@ _GSS_union_set_destroy (struct Set *set)
1357 destroy_elements (set->state.u); 1383 destroy_elements (set->state.u);
1358 1384
1359 while (NULL != set->state.u->ops_head) 1385 while (NULL != set->state.u->ops_head)
1386 {
1360 destroy_union_operation (set->state.u->ops_head); 1387 destroy_union_operation (set->state.u->ops_head);
1388 }
1361} 1389}
1362 1390
1363/** 1391/**
@@ -1368,7 +1396,7 @@ _GSS_union_set_destroy (struct Set *set)
1368 * @param set set to remove the element from 1396 * @param set set to remove the element from
1369 */ 1397 */
1370void 1398void
1371_GSS_union_remove (struct ElementMessage *m, struct Set *set) 1399_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1372{ 1400{
1373 struct GNUNET_HashCode hash; 1401 struct GNUNET_HashCode hash;
1374 struct ElementEntry *ee; 1402 struct ElementEntry *ee;
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
index 5f2d1c976..ae84610fc 100644
--- a/src/set/gnunet-set.c
+++ b/src/set/gnunet-set.c
@@ -91,11 +91,12 @@ listen_cb (void *cls,
91 const struct GNUNET_MessageHeader *context_msg, 91 const struct GNUNET_MessageHeader *context_msg,
92 struct GNUNET_SET_Request *request) 92 struct GNUNET_SET_Request *request)
93{ 93{
94 struct GNUNET_SET_OperationHandle *oh;
94 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); 95 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
95 GNUNET_SET_listen_cancel (listen_handle); 96 GNUNET_SET_listen_cancel (listen_handle);
96 97
97 GNUNET_SET_accept (request, set2, 98 oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
98 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); 99 GNUNET_SET_conclude (oh, set2);
99} 100}
100 101
101 102
@@ -107,11 +108,14 @@ listen_cb (void *cls,
107static void 108static void
108start (void *cls) 109start (void *cls)
109{ 110{
111 struct GNUNET_SET_OperationHandle *oh;
112
110 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, 113 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
111 &app_id, listen_cb, NULL); 114 &app_id, listen_cb, NULL);
112 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, 115 oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
113 GNUNET_SET_RESULT_ADDED, 116 GNUNET_SET_RESULT_ADDED,
114 result_cb_set1, NULL); 117 result_cb_set1, NULL);
118 GNUNET_SET_conclude (oh, set1);
115} 119}
116 120
117 121
diff --git a/src/set/set.h b/src/set/set.h
index ad2200de9..7ec3e6cb2 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -29,17 +29,12 @@
29#include "platform.h" 29#include "platform.h"
30#include "gnunet_common.h" 30#include "gnunet_common.h"
31 31
32 32#define GNUNET_SET_ACK_WINDOW 10
33/**
34 * The service sends up to GNUNET_SET_ACK_WINDOW messages per client handle,
35 * the client should send an ack every GNUNET_SET_ACK_WINDOW/2 messages.
36 */
37#define GNUNET_SET_ACK_WINDOW 8
38 33
39 34
40GNUNET_NETWORK_STRUCT_BEGIN 35GNUNET_NETWORK_STRUCT_BEGIN
41 36
42struct SetCreateMessage 37struct GNUNET_SET_CreateMessage
43{ 38{
44 /** 39 /**
45 * Type: GNUNET_MESSAGE_TYPE_SET_CREATE 40 * Type: GNUNET_MESSAGE_TYPE_SET_CREATE
@@ -54,7 +49,7 @@ struct SetCreateMessage
54}; 49};
55 50
56 51
57struct ListenMessage 52struct GNUNET_SET_ListenMessage
58{ 53{
59 /** 54 /**
60 * Type: GNUNET_MESSAGE_TYPE_SET_LISTEN 55 * Type: GNUNET_MESSAGE_TYPE_SET_LISTEN
@@ -74,32 +69,31 @@ struct ListenMessage
74}; 69};
75 70
76 71
77struct AcceptMessage 72struct GNUNET_SET_AcceptRejectMessage
78{ 73{
79 /** 74 /**
80 * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT 75 * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT or
76 * GNUNET_MESSAGE_TYPE_SET_REJECT
81 */ 77 */
82 struct GNUNET_MessageHeader header; 78 struct GNUNET_MessageHeader header;
83 79
84 /** 80 /**
85 * Request id that will be sent along with 81 * ID of the incoming request we want to accept / reject.
86 * results for the accepted operation.
87 * Chosen by the client.
88 * Must be 0 if the request has been rejected.
89 */ 82 */
90 uint32_t request_id GNUNET_PACKED; 83 uint32_t accept_reject_id GNUNET_PACKED;
91 84
92 /** 85 /**
93 * ID of the incoming request we want to accept / reject. 86 * Request ID to identify responses,
87 * must be 0 if we don't accept the request.
94 */ 88 */
95 uint32_t accept_id GNUNET_PACKED; 89 uint32_t request_id GNUNET_PACKED;
96}; 90};
97 91
98 92
99/** 93/**
100 * A request for an operation with another client. 94 * A request for an operation with another client.
101 */ 95 */
102struct RequestMessage 96struct GNUNET_SET_RequestMessage
103{ 97{
104 /** 98 /**
105 * Type: GNUNET_MESSAGE_TYPE_SET_Request. 99 * Type: GNUNET_MESSAGE_TYPE_SET_Request.
@@ -107,21 +101,21 @@ struct RequestMessage
107 struct GNUNET_MessageHeader header; 101 struct GNUNET_MessageHeader header;
108 102
109 /** 103 /**
110 * ID of the request we want to accept, 104 * Identity of the requesting peer.
111 * chosen by the service.
112 */ 105 */
113 uint32_t accept_id GNUNET_PACKED; 106 struct GNUNET_PeerIdentity peer_id;
114 107
115 /** 108 /**
116 * Identity of the requesting peer. 109 * ID of the to identify the request when accepting or
110 * rejecting it.
117 */ 111 */
118 struct GNUNET_PeerIdentity peer_id; 112 uint32_t accept_id GNUNET_PACKED;
119 113
120 /* rest: nested context message */ 114 /* rest: nested context message */
121}; 115};
122 116
123 117
124struct EvaluateMessage 118struct GNUNET_SET_EvaluateMessage
125{ 119{
126 /** 120 /**
127 * Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE 121 * Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE
@@ -136,7 +130,7 @@ struct EvaluateMessage
136 /** 130 /**
137 * Peer to evaluate the operation with 131 * Peer to evaluate the operation with
138 */ 132 */
139 struct GNUNET_PeerIdentity peer; 133 struct GNUNET_PeerIdentity target_peer;
140 134
141 /** 135 /**
142 * Application id 136 * Application id
@@ -157,7 +151,7 @@ struct EvaluateMessage
157}; 151};
158 152
159 153
160struct ResultMessage 154struct GNUNET_SET_ResultMessage
161{ 155{
162 /** 156 /**
163 * Type: GNUNET_MESSAGE_TYPE_SET_RESULT 157 * Type: GNUNET_MESSAGE_TYPE_SET_RESULT
@@ -184,7 +178,7 @@ struct ResultMessage
184}; 178};
185 179
186 180
187struct ElementMessage 181struct GNUNET_SET_ElementMessage
188{ 182{
189 /** 183 /**
190 * Type: GNUNET_MESSAGE_TYPE_SET_ADD or 184 * Type: GNUNET_MESSAGE_TYPE_SET_ADD or
@@ -200,20 +194,6 @@ struct ElementMessage
200}; 194};
201 195
202 196
203struct CancelMessage
204{
205 /**
206 * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL
207 */
208 struct GNUNET_MessageHeader header;
209
210 /**
211 * id we want to cancel result belongs to
212 */
213 uint32_t request_id GNUNET_PACKED;
214};
215
216
217GNUNET_NETWORK_STRUCT_END 197GNUNET_NETWORK_STRUCT_END
218 198
219#endif 199#endif
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 5838680b9..c74933aa0 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -33,6 +33,7 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35 35
36
36/** 37/**
37 * Opaque handle to a set. 38 * Opaque handle to a set.
38 */ 39 */
@@ -52,13 +53,33 @@ struct GNUNET_SET_Request
52 int accepted; 53 int accepted;
53}; 54};
54 55
55
56struct GNUNET_SET_OperationHandle 56struct GNUNET_SET_OperationHandle
57{ 57{
58 GNUNET_SET_ResultIterator result_cb; 58 GNUNET_SET_ResultIterator result_cb;
59 void *result_cls; 59 void *result_cls;
60
61 /**
62 * Local set used for the operation,
63 * NULL if no set has been provided by conclude yet.
64 */
60 struct GNUNET_SET_Handle *set; 65 struct GNUNET_SET_Handle *set;
66
67 /**
68 * Request ID to identify the operation within the set.
69 */
61 uint32_t request_id; 70 uint32_t request_id;
71
72 /**
73 * Message sent to the server on calling conclude,
74 * NULL if conclude has been called.
75 */
76 struct GNUNET_MQ_Message *conclude_mqm;
77
78 /**
79 * Address of the request if in the conclude message,
80 * used to patch the request id into the message when the set is known.
81 */
82 uint32_t *request_id_addr;
62}; 83};
63 84
64 85
@@ -83,18 +104,21 @@ struct GNUNET_SET_ListenHandle
83static void 104static void
84handle_result (void *cls, const struct GNUNET_MessageHeader *mh) 105handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
85{ 106{
86 struct ResultMessage *msg = (struct ResultMessage *) mh; 107 struct GNUNET_SET_ResultMessage *msg = (struct GNUNET_SET_ResultMessage *) mh;
87 struct GNUNET_SET_Handle *set = cls; 108 struct GNUNET_SET_Handle *set = cls;
88 struct GNUNET_SET_OperationHandle *oh; 109 struct GNUNET_SET_OperationHandle *oh;
89 struct GNUNET_SET_Element e; 110 struct GNUNET_SET_Element e;
90 111
112
113 GNUNET_assert (NULL != set);
114 GNUNET_assert (NULL != set->mq);
115
91 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) 116 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
92 { 117 {
93 struct GNUNET_MQ_Message *mqm; 118 struct GNUNET_MQ_Message *mqm;
94 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); 119 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
95 GNUNET_MQ_send (set->mq, mqm); 120 GNUNET_MQ_send (set->mq, mqm);
96 } 121 }
97
98 oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); 122 oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id));
99 GNUNET_assert (NULL != oh); 123 GNUNET_assert (NULL != oh);
100 /* status is not STATUS_OK => there's no attached element, 124 /* status is not STATUS_OK => there's no attached element,
@@ -109,7 +133,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
109 } 133 }
110 134
111 e.data = &msg[1]; 135 e.data = &msg[1];
112 e.size = ntohs (mh->size) - sizeof (struct ResultMessage); 136 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
113 e.type = msg->element_type; 137 e.type = msg->element_type;
114 if (NULL != oh->result_cb) 138 if (NULL != oh->result_cb)
115 oh->result_cb (oh->result_cls, &e, htons (msg->result_status)); 139 oh->result_cb (oh->result_cls, &e, htons (msg->result_status));
@@ -124,28 +148,34 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
124static void 148static void
125handle_request (void *cls, const struct GNUNET_MessageHeader *mh) 149handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
126{ 150{
127 struct RequestMessage *msg = (struct RequestMessage *) mh; 151 struct GNUNET_SET_RequestMessage *msg = (struct GNUNET_SET_RequestMessage *) mh;
128 struct GNUNET_SET_ListenHandle *lh = cls; 152 struct GNUNET_SET_ListenHandle *lh = cls;
129 struct GNUNET_SET_Request *req; 153 struct GNUNET_SET_Request *req;
154 struct GNUNET_MessageHeader *context_msg;
130 155
156 LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n");
131 req = GNUNET_new (struct GNUNET_SET_Request); 157 req = GNUNET_new (struct GNUNET_SET_Request);
132 req->accept_id = ntohl (msg->accept_id); 158 req->accept_id = ntohl (msg->accept_id);
159 context_msg = GNUNET_MQ_extract_nested_mh (msg);
133 /* calling GNUNET_SET_accept in the listen cb will set req->accepted */ 160 /* calling GNUNET_SET_accept in the listen cb will set req->accepted */
134 lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); 161 lh->listen_cb (lh->listen_cls, &msg->peer_id, context_msg, req);
135 162
136 if (GNUNET_NO == req->accepted) 163 if (GNUNET_NO == req->accepted)
137 { 164 {
138 struct GNUNET_MQ_Message *mqm; 165 struct GNUNET_MQ_Message *mqm;
139 struct AcceptMessage *amsg; 166 struct GNUNET_SET_AcceptRejectMessage *amsg;
140 167
141 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); 168 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT);
142 /* no request id, as we refused */ 169 /* no request id, as we refused */
143 amsg->request_id = htonl (0); 170 amsg->request_id = htonl (0);
144 amsg->accept_id = msg->accept_id; 171 amsg->accept_reject_id = msg->accept_id;
145 GNUNET_MQ_send (lh->mq, mqm); 172 GNUNET_MQ_send (lh->mq, mqm);
146 GNUNET_free (req); 173 GNUNET_free (req);
174 LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n");
147 } 175 }
148 176
177 LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n");
178
149 /* the accept-case is handled in GNUNET_SET_accept, 179 /* the accept-case is handled in GNUNET_SET_accept,
150 * as we have the accept message available there */ 180 * as we have the accept message available there */
151} 181}
@@ -168,7 +198,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
168{ 198{
169 struct GNUNET_SET_Handle *set; 199 struct GNUNET_SET_Handle *set;
170 struct GNUNET_MQ_Message *mqm; 200 struct GNUNET_MQ_Message *mqm;
171 struct SetCreateMessage *msg; 201 struct GNUNET_SET_CreateMessage *msg;
172 static const struct GNUNET_MQ_Handler mq_handlers[] = { 202 static const struct GNUNET_MQ_Handler mq_handlers[] = {
173 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, 203 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
174 GNUNET_MQ_HANDLERS_END 204 GNUNET_MQ_HANDLERS_END
@@ -179,6 +209,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
179 LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n"); 209 LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n");
180 GNUNET_assert (NULL != set->client); 210 GNUNET_assert (NULL != set->client);
181 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); 211 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set);
212 GNUNET_assert (NULL != set->mq);
182 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); 213 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
183 msg->operation = htons (op); 214 msg->operation = htons (op);
184 GNUNET_MQ_send (set->mq, mqm); 215 GNUNET_MQ_send (set->mq, mqm);
@@ -204,7 +235,7 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
204 void *cont_cls) 235 void *cont_cls)
205{ 236{
206 struct GNUNET_MQ_Message *mqm; 237 struct GNUNET_MQ_Message *mqm;
207 struct ElementMessage *msg; 238 struct GNUNET_SET_ElementMessage *msg;
208 239
209 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); 240 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
210 msg->element_type = element->type; 241 msg->element_type = element->type;
@@ -232,7 +263,7 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
232 void *cont_cls) 263 void *cont_cls)
233{ 264{
234 struct GNUNET_MQ_Message *mqm; 265 struct GNUNET_MQ_Message *mqm;
235 struct ElementMessage *msg; 266 struct GNUNET_SET_ElementMessage *msg;
236 267
237 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); 268 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE);
238 msg->element_type = element->type; 269 msg->element_type = element->type;
@@ -256,10 +287,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
256 287
257 288
258/** 289/**
259 * Evaluate a set operation with our set and the set of another peer. 290 * Create a set operation for evaluation with another peer.
291 * The evaluation will not start until the client provides
292 * a local set with GNUNET_SET_conclude.
260 * 293 *
261 * @param set set to use
262 * @param salt salt for HKDF (explain more here)
263 * @param other_peer peer with the other set 294 * @param other_peer peer with the other set
264 * @param app_id hash for the application using the set 295 * @param app_id hash for the application using the set
265 * @param context_msg additional information for the request 296 * @param context_msg additional information for the request
@@ -273,8 +304,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
273 * @return a handle to cancel the operation 304 * @return a handle to cancel the operation
274 */ 305 */
275struct GNUNET_SET_OperationHandle * 306struct GNUNET_SET_OperationHandle *
276GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, 307GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
277 const struct GNUNET_PeerIdentity *other_peer,
278 const struct GNUNET_HashCode *app_id, 308 const struct GNUNET_HashCode *app_id,
279 const struct GNUNET_MessageHeader *context_msg, 309 const struct GNUNET_MessageHeader *context_msg,
280 uint16_t salt, 310 uint16_t salt,
@@ -283,24 +313,24 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
283 void *result_cls) 313 void *result_cls)
284{ 314{
285 struct GNUNET_MQ_Message *mqm; 315 struct GNUNET_MQ_Message *mqm;
286 struct EvaluateMessage *msg;
287 struct GNUNET_SET_OperationHandle *oh; 316 struct GNUNET_SET_OperationHandle *oh;
317 struct GNUNET_SET_EvaluateMessage *msg;
288 318
289 oh = GNUNET_new (struct GNUNET_SET_OperationHandle); 319 oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
290 oh->result_cb = result_cb; 320 oh->result_cb = result_cb;
291 oh->result_cls = result_cls; 321 oh->result_cls = result_cls;
292 oh->set = set;
293 322
294 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE); 323 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg);
295 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); 324
296 msg->peer = *other_peer;
297 msg->app_id = *app_id;
298
299 if (NULL != context_msg) 325 if (NULL != context_msg)
300 if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size))) 326 LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n");
301 GNUNET_assert (0); 327
302 328 msg->app_id = *app_id;
303 GNUNET_MQ_send (set->mq, mqm); 329 msg->target_peer = *other_peer;
330 msg->salt = salt;
331 msg->reserved = 0;
332 oh->conclude_mqm = mqm;
333 oh->request_id_addr = &msg->request_id;
304 334
305 return oh; 335 return oh;
306} 336}
@@ -327,7 +357,7 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
327{ 357{
328 struct GNUNET_SET_ListenHandle *lh; 358 struct GNUNET_SET_ListenHandle *lh;
329 struct GNUNET_MQ_Message *mqm; 359 struct GNUNET_MQ_Message *mqm;
330 struct ListenMessage *msg; 360 struct GNUNET_SET_ListenMessage *msg;
331 static const struct GNUNET_MQ_Handler mq_handlers[] = { 361 static const struct GNUNET_MQ_Handler mq_handlers[] = {
332 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, 362 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
333 GNUNET_MQ_HANDLERS_END 363 GNUNET_MQ_HANDLERS_END
@@ -363,10 +393,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
363 393
364 394
365/** 395/**
366 * Accept a request we got via GNUNET_SET_listen. 396 * Accept a request we got via GNUNET_SET_listen. Must be called during
397 * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
398 * afterwards.
399 * Call GNUNET_SET_conclude to provide the local set to use for the operation,
400 * and to begin the exchange with the remote peer.
367 * 401 *
368 * @param request request to accept 402 * @param request request to accept
369 * @param set set used for the requested operation
370 * @param result_mode specified how results will be returned, 403 * @param result_mode specified how results will be returned,
371 * see 'GNUNET_SET_ResultMode'. 404 * see 'GNUNET_SET_ResultMode'.
372 * @param result_cb callback for the results 405 * @param result_cb callback for the results
@@ -375,28 +408,26 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
375 */ 408 */
376struct GNUNET_SET_OperationHandle * 409struct GNUNET_SET_OperationHandle *
377GNUNET_SET_accept (struct GNUNET_SET_Request *request, 410GNUNET_SET_accept (struct GNUNET_SET_Request *request,
378 struct GNUNET_SET_Handle *set,
379 enum GNUNET_SET_ResultMode result_mode, 411 enum GNUNET_SET_ResultMode result_mode,
380 GNUNET_SET_ResultIterator result_cb, 412 GNUNET_SET_ResultIterator result_cb,
381 void *result_cls) 413 void *cls)
382{ 414{
383 struct GNUNET_MQ_Message *mqm; 415 struct GNUNET_MQ_Message *mqm;
384 struct AcceptMessage *msg;
385 struct GNUNET_SET_OperationHandle *oh; 416 struct GNUNET_SET_OperationHandle *oh;
417 struct GNUNET_SET_AcceptRejectMessage *msg;
386 418
387 /* don't accept a request twice! */
388 GNUNET_assert (GNUNET_NO == request->accepted); 419 GNUNET_assert (GNUNET_NO == request->accepted);
389 request->accepted = GNUNET_YES; 420 request->accepted = GNUNET_YES;
390 421
391 oh = GNUNET_new (struct GNUNET_SET_OperationHandle); 422 oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
392 oh->result_cb = result_cb; 423 oh->result_cb = result_cb;
393 oh->result_cls = result_cls; 424 oh->result_cls = cls;
394 oh->set = set;
395 425
396 mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); 426 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
397 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh)); 427 msg->accept_reject_id = htonl (request->accept_id);
398 msg->accept_id = htonl (request->accept_id); 428
399 GNUNET_MQ_send (set->mq, mqm); 429 oh->conclude_mqm = mqm;
430 oh->request_id_addr = &msg->request_id;
400 431
401 return oh; 432 return oh;
402} 433}
@@ -413,10 +444,43 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
413 struct GNUNET_MQ_Message *mqm; 444 struct GNUNET_MQ_Message *mqm;
414 struct GNUNET_SET_OperationHandle *h_assoc; 445 struct GNUNET_SET_OperationHandle *h_assoc;
415 446
416 h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); 447 if (NULL != oh->set)
417 GNUNET_assert (h_assoc == oh); 448 {
418 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); 449 h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
419 GNUNET_MQ_send (oh->set->mq, mqm); 450 GNUNET_assert (h_assoc == oh);
451 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
452 GNUNET_MQ_send (oh->set->mq, mqm);
453 }
454
455 if (NULL != oh->conclude_mqm)
456 GNUNET_MQ_discard (oh->conclude_mqm);
457
420 GNUNET_free (oh); 458 GNUNET_free (oh);
421} 459}
422 460
461
462/**
463 * Conclude the given set operation using the given set.
464 * This function is called once we have fully constructed
465 * the set that we want to use for the operation. At this
466 * time, the P2P protocol can then begin to exchange the
467 * set information and call the result callback with the
468 * result information.
469 *
470 * @param oh handle to the set operation
471 * @param set the set to use for the operation
472 */
473void
474GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
475 struct GNUNET_SET_Handle *set)
476{
477 GNUNET_assert (NULL == oh->set);
478 GNUNET_assert (NULL != oh->conclude_mqm);
479 oh->set = set;
480 oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh);
481 *oh->request_id_addr = htonl (oh->request_id);
482 GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
483 oh->conclude_mqm = NULL;
484 oh->request_id_addr = NULL;
485}
486
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index 34b7a8d2f..7bc26ed7e 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -8,8 +8,8 @@ PORT = 2106
8HOSTNAME = localhost 8HOSTNAME = localhost
9HOME = $SERVICEHOME 9HOME = $SERVICEHOME
10BINARY = gnunet-service-set 10BINARY = gnunet-service-set
11#PREFIX = gdbserver :12345
12#PREFIX = valgrind --leak-check=full 11#PREFIX = valgrind --leak-check=full
12#PREFIX = gdbserver :1234
13ACCEPT_FROM = 127.0.0.1; 13ACCEPT_FROM = 127.0.0.1;
14ACCEPT_FROM6 = ::1; 14ACCEPT_FROM6 = ::1;
15UNIXPATH = /tmp/gnunet-service-set.sock 15UNIXPATH = /tmp/gnunet-service-set.sock
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index bf0d65697..f773cebdf 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -20,7 +20,7 @@
20 20
21/** 21/**
22 * @file set/test_set_api.c 22 * @file set/test_set_api.c
23 * @brief testcase for consensus_api.c 23 * @brief testcase for set_api.c
24 */ 24 */
25#include "platform.h" 25#include "platform.h"
26#include "gnunet_util_lib.h" 26#include "gnunet_util_lib.h"
@@ -89,11 +89,13 @@ listen_cb (void *cls,
89 const struct GNUNET_MessageHeader *context_msg, 89 const struct GNUNET_MessageHeader *context_msg,
90 struct GNUNET_SET_Request *request) 90 struct GNUNET_SET_Request *request)
91{ 91{
92 struct GNUNET_SET_OperationHandle *oh;
93
92 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); 94 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
93 GNUNET_SET_listen_cancel (listen_handle); 95 GNUNET_SET_listen_cancel (listen_handle);
94 96
95 GNUNET_SET_accept (request, set2, 97 oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
96 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); 98 GNUNET_SET_conclude (oh, set2);
97} 99}
98 100
99 101
@@ -105,11 +107,14 @@ listen_cb (void *cls,
105static void 107static void
106start (void *cls) 108start (void *cls)
107{ 109{
110 struct GNUNET_SET_OperationHandle *oh;
111
108 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, 112 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
109 &app_id, listen_cb, NULL); 113 &app_id, listen_cb, NULL);
110 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, 114 oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
111 GNUNET_SET_RESULT_ADDED, 115 GNUNET_SET_RESULT_ADDED,
112 result_cb_set1, NULL); 116 result_cb_set1, NULL);
117 GNUNET_SET_conclude (oh, set1);
113} 118}
114 119
115 120
@@ -168,12 +173,14 @@ run (void *cls,
168 struct GNUNET_TESTING_Peer *peer) 173 struct GNUNET_TESTING_Peer *peer)
169{ 174{
170 175
171 static const char* app_str = "gnunet-set";
172
173 config = cfg; 176 config = cfg;
177 GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
178 printf ("my id (from CRYPTO): %s\n", GNUNET_h2s (&local_id.hashPubKey));
174 GNUNET_TESTING_peer_get_identity (peer, &local_id); 179 GNUNET_TESTING_peer_get_identity (peer, &local_id);
180 printf ("my id (from TESTING): %s\n", GNUNET_h2s (&local_id.hashPubKey));
175 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 181 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
176 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 182 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
183 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
177 init_set1 (); 184 init_set1 ();
178} 185}
179 186
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index b4a47b53d..34f1ea0fa 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -3785,7 +3785,29 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size
3785 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; 3785 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
3786 struct GNUNET_MQ_Message *mqm; 3786 struct GNUNET_MQ_Message *mqm;
3787 3787
3788 GNUNET_assert (GNUNET_STREAM_OK == status); 3788 switch (status)
3789 {
3790 case GNUNET_STREAM_OK:
3791 break;
3792 case GNUNET_STREAM_SHUTDOWN:
3793 /* FIXME: call shutdown handler */
3794 return;
3795 case GNUNET_STREAM_TIMEOUT:
3796 if (NULL == mq->error_handler)
3797 LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n");
3798 else
3799 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3800 return;
3801 case GNUNET_STREAM_SYSERR:
3802 if (NULL == mq->error_handler)
3803 LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n");
3804 else
3805 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
3806 return;
3807 default:
3808 GNUNET_assert (0);
3809 return;
3810 }
3789 3811
3790 /* call cb for message we finished sending */ 3812 /* call cb for message we finished sending */
3791 mqm = mq->current_msg; 3813 mqm = mq->current_msg;
@@ -3863,21 +3885,53 @@ mq_stream_mst_callback (void *cls, void *client,
3863 */ 3885 */
3864static size_t 3886static size_t
3865mq_stream_data_processor (void *cls, 3887mq_stream_data_processor (void *cls,
3866 enum GNUNET_STREAM_Status status, 3888 enum GNUNET_STREAM_Status status,
3867 const void *data, 3889 const void *data,
3868 size_t size) 3890 size_t size)
3869{ 3891{
3870 struct GNUNET_MQ_MessageQueue *mq = cls; 3892 struct GNUNET_MQ_MessageQueue *mq = cls;
3871 struct MQStreamState *mss; 3893 struct MQStreamState *mss;
3872 int ret; 3894 int ret;
3895
3896 switch (status)
3897 {
3898 case GNUNET_STREAM_OK:
3899 break;
3900 case GNUNET_STREAM_SHUTDOWN:
3901 /* FIXME: call shutdown handler */
3902 return 0;
3903 case GNUNET_STREAM_TIMEOUT:
3904 if (NULL == mq->error_handler)
3905 LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n");
3906 else
3907 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3908 return 0;
3909 case GNUNET_STREAM_SYSERR:
3910 if (NULL == mq->error_handler)
3911 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n");
3912 else
3913 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3914 return 0;
3915 default:
3916 GNUNET_assert (0);
3917 return 0;
3918 }
3873 3919
3874 mss = (struct MQStreamState *) mq->impl_state; 3920 mss = (struct MQStreamState *) mq->impl_state;
3875 GNUNET_assert (GNUNET_STREAM_OK == status); 3921 GNUNET_assert (GNUNET_STREAM_OK == status);
3876 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); 3922 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
3877 GNUNET_assert (GNUNET_OK == ret); 3923 if (GNUNET_OK != ret)
3924 {
3925 if (NULL == mq->error_handler)
3926 LOG (GNUNET_ERROR_TYPE_WARNING,
3927 "read error (message stream malformed), but no error handler installed for message queue\n");
3928 else
3929 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3930 return 0;
3931 }
3878 /* we always read all data */ 3932 /* we always read all data */
3879 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 3933 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
3880 mq_stream_data_processor, mq); 3934 mq_stream_data_processor, mq);
3881 return size; 3935 return size;
3882} 3936}
3883 3937
@@ -3935,6 +3989,7 @@ GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
3935 mq->destroy_impl = mq_stream_destroy_impl; 3989 mq->destroy_impl = mq_stream_destroy_impl;
3936 mq->handlers = msg_handlers; 3990 mq->handlers = msg_handlers;
3937 mq->handlers_cls = cls; 3991 mq->handlers_cls = cls;
3992 mq->error_handler = error_handler;
3938 if (NULL != msg_handlers) 3993 if (NULL != msg_handlers)
3939 { 3994 {
3940 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); 3995 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
diff --git a/src/util/crypto_hash.c b/src/util/crypto_hash.c
index fca66aed4..2e436c454 100644
--- a/src/util/crypto_hash.c
+++ b/src/util/crypto_hash.c
@@ -339,7 +339,7 @@ GNUNET_CRYPTO_hash_distance_u32 (const struct GNUNET_HashCode * a,
339 */ 339 */
340void 340void
341GNUNET_CRYPTO_hash_create_random (enum GNUNET_CRYPTO_Quality mode, 341GNUNET_CRYPTO_hash_create_random (enum GNUNET_CRYPTO_Quality mode,
342 struct GNUNET_HashCode * result) 342 struct GNUNET_HashCode *result)
343{ 343{
344 int i; 344 int i;
345 345
diff --git a/src/util/mq.c b/src/util/mq.c
index 36cacd30b..dc87b9711 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -119,33 +119,31 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
119} 119}
120 120
121 121
122int 122struct GNUNET_MQ_Message *
123GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, 123GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
124 const void *data, uint16_t len) 124 const struct GNUNET_MessageHeader *nested_mh)
125{ 125{
126 size_t new_size; 126 struct GNUNET_MQ_Message *mqm;
127 size_t old_size; 127 uint16_t size;
128 128
129 GNUNET_assert (NULL != mqmp); 129 if (NULL == nested_mh)
130 /* there's no data to append => do nothing */ 130 return GNUNET_MQ_msg_ (mhp, base_size, type);
131 if (NULL == data)
132 return GNUNET_OK;
133 old_size = ntohs ((*mqmp)->mh->size);
134 /* message too large to concatenate? */
135 if (((uint16_t) (old_size + len)) < len)
136 return GNUNET_SYSERR;
137 new_size = old_size + len;
138 *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
139 (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
140 memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
141 (*mqmp)->mh->size = htons (new_size);
142 return GNUNET_OK;
143}
144 131
132 size = base_size + ntohs (nested_mh->size);
145 133
134 /* check for uint16_t overflow */
135 if (size < base_size)
136 return NULL;
137
138 mqm = GNUNET_MQ_msg_ (mhp, size, type);
139 memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
140
141 return mqm;
142}
146 143
147 144
148/*** Transmit a queued message to the session's client. 145/**
146 * Transmit a queued message to the session's client.
149 * 147 *
150 * @param cls consensus session 148 * @param cls consensus session
151 * @param size number of bytes available in buf 149 * @param size number of bytes available in buf
@@ -265,7 +263,8 @@ handle_client_message (void *cls,
265 { 263 {
266 if (NULL == mq->error_handler) 264 if (NULL == mq->error_handler)
267 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); 265 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
268 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); 266 else
267 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
269 return; 268 return;
270 } 269 }
271 270
@@ -479,3 +478,39 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
479 GNUNET_free (mq); 478 GNUNET_free (mq);
480} 479}
481 480
481
482
483
484struct GNUNET_MessageHeader *
485GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
486{
487 uint16_t whole_size;
488 uint16_t nested_size;
489 struct GNUNET_MessageHeader *nested_msg;
490
491 whole_size = ntohs (mh->size);
492 GNUNET_assert (whole_size >= base_size);
493
494 nested_size = whole_size - base_size;
495
496 if (0 == nested_size)
497 return NULL;
498
499 if (nested_size < sizeof (struct GNUNET_MessageHeader))
500 {
501 GNUNET_break_op (0);
502 return NULL;
503 }
504
505 nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
506
507 if (ntohs (nested_msg->size) != nested_size)
508 {
509 GNUNET_break_op (0);
510 nested_msg->size = htons (nested_size);
511 }
512
513 return nested_msg;
514}
515
516
diff --git a/src/util/test_mq.c b/src/util/test_mq.c
index 161b40a20..55cd80ef1 100644
--- a/src/util/test_mq.c
+++ b/src/util/test_mq.c
@@ -58,35 +58,6 @@ void
58test2 (void) 58test2 (void)
59{ 59{
60 struct GNUNET_MQ_Message *mqm; 60 struct GNUNET_MQ_Message *mqm;
61 struct MyMessage *mm;
62 int res;
63 char *s = "foo";
64
65 mqm = GNUNET_MQ_msg (mm, 42);
66 res = GNUNET_MQ_nest (mqm, s, strlen(s));
67 GNUNET_assert (GNUNET_OK == res);
68 res = GNUNET_MQ_nest (mqm, s, strlen(s));
69 GNUNET_assert (GNUNET_OK == res);
70 res = GNUNET_MQ_nest (mqm, NULL, 0);
71 GNUNET_assert (GNUNET_OK == res);
72
73 GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs (mm->header.size));
74
75 res = GNUNET_MQ_nest_mh (mqm, &mm->header);
76 GNUNET_assert (GNUNET_OK == res);
77 GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs (mm->header.size));
78
79 res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0);
80 GNUNET_assert (GNUNET_OK != res);
81
82 GNUNET_MQ_discard (mqm);
83}
84
85
86void
87test3 (void)
88{
89 struct GNUNET_MQ_Message *mqm;
90 struct GNUNET_MessageHeader *mh; 61 struct GNUNET_MessageHeader *mh;
91 62
92 mqm = GNUNET_MQ_msg_header (42); 63 mqm = GNUNET_MQ_msg_header (42);
@@ -107,7 +78,6 @@ main (int argc, char **argv)
107 GNUNET_log_setup ("test-mq", "INFO", NULL); 78 GNUNET_log_setup ("test-mq", "INFO", NULL);
108 test1 (); 79 test1 ();
109 test2 (); 80 test2 ();
110 test3 ();
111 81
112 return 0; 82 return 0;
113} 83}