aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
commit68403fa780bf94ace2ebc13c2c09463cbbc0b57c (patch)
tree3442e4f25de90eab67c4f9813cb6e433c50b7482 /src/set
parentfae7f583f2e11cac15fefcbefef64287ab6915d3 (diff)
downloadgnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.tar.gz
gnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.zip
- conclude for SET
- consensus with SET
Diffstat (limited to 'src/set')
-rw-r--r--src/set/gnunet-service-set.c166
-rw-r--r--src/set/gnunet-service-set.h8
-rw-r--r--src/set/gnunet-service-set_union.c116
-rw-r--r--src/set/gnunet-set.c14
-rw-r--r--src/set/set.h62
-rw-r--r--src/set/set_api.c156
-rw-r--r--src/set/test_set.conf2
-rw-r--r--src/set/test_set_api.c23
8 files changed, 315 insertions, 232 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 2aea50365..4da718879 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -226,6 +226,23 @@ destroy_incoming (struct Incoming *incoming)
226 GNUNET_free (incoming); 226 GNUNET_free (incoming);
227} 227}
228 228
229static struct Listener *
230get_listener_by_target (enum GNUNET_SET_OperationType op,
231 const struct GNUNET_HashCode *app_id)
232{
233 struct Listener *l;
234
235 for (l = listeners_head; NULL != l; l = l->next)
236 {
237 if (l->operation != op)
238 continue;
239 if (0 != GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id))
240 continue;
241 return l;
242 }
243 return NULL;
244}
245
229 246
230/** 247/**
231 * Handle a request for a set operation from 248 * Handle a request for a set operation from
@@ -240,62 +257,33 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
240 struct Incoming *incoming = cls; 257 struct Incoming *incoming = cls;
241 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; 258 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
242 struct GNUNET_MQ_Message *mqm; 259 struct GNUNET_MQ_Message *mqm;
243 struct RequestMessage *cmsg; 260 struct GNUNET_SET_RequestMessage *cmsg;
244 struct Listener *listener; 261 struct Listener *listener;
245 const struct GNUNET_MessageHeader *context_msg; 262 const struct GNUNET_MessageHeader *context_msg;
246 263
247 if (ntohs (mh->size) < sizeof *msg) 264 context_msg = GNUNET_MQ_extract_nested_mh (msg);
265 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
266 ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
267 listener = get_listener_by_target (ntohs (msg->operation), &msg->app_id);
268 if (NULL == listener)
248 { 269 {
249 /* message is to small for its type */ 270 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
250 GNUNET_break (0); 271 "set operation request from peer failed: "
251 destroy_incoming (incoming); 272 "no set with matching application ID and operation type\n");
252 return; 273 return;
253 } 274 }
254 else if (ntohs (mh->size) == sizeof *msg) 275 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg);
255 { 276 if (NULL == mqm)
256 /* there is no context message */
257 context_msg = NULL;
258 }
259 else
260 { 277 {
261 context_msg = &msg[1].header; 278 /* FIXME: disconnect the peer */
262 if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size)) 279 GNUNET_break_op (0);
263 {
264 /* size of context message is invalid */
265 GNUNET_break (0);
266 destroy_incoming (incoming);
267 return;
268 }
269 }
270
271 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
272 ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
273
274 /* find the appropriate listener */
275 for (listener = listeners_head;
276 listener != NULL;
277 listener = listener->next)
278 {
279 if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
280 (ntohs (msg->operation) != listener->operation) )
281 continue;
282 mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
283 if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg))
284 {
285 /* FIXME: disconnect the peer */
286 GNUNET_MQ_discard (mqm);
287 GNUNET_break (0);
288 return;
289 }
290 incoming->accept_id = accept_id++;
291 cmsg->accept_id = htonl (incoming->accept_id);
292 GNUNET_MQ_send (listener->client_mq, mqm);
293 return; 280 return;
294 } 281 }
295 282 incoming->accept_id = accept_id++;
296 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 283 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id);
297 "set operation request from peer failed: " 284 cmsg->accept_id = htonl (incoming->accept_id);
298 "no set with matching application ID and operation type\n"); 285 cmsg->peer_id = incoming->peer;
286 GNUNET_MQ_send (listener->client_mq, mqm);
299} 287}
300 288
301 289
@@ -311,7 +299,7 @@ handle_client_create (void *cls,
311 struct GNUNET_SERVER_Client *client, 299 struct GNUNET_SERVER_Client *client,
312 const struct GNUNET_MessageHeader *m) 300 const struct GNUNET_MessageHeader *m)
313{ 301{
314 struct SetCreateMessage *msg = (struct SetCreateMessage *) m; 302 struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m;
315 struct Set *set; 303 struct Set *set;
316 304
317 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", 305 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n",
@@ -363,7 +351,7 @@ handle_client_listen (void *cls,
363 struct GNUNET_SERVER_Client *client, 351 struct GNUNET_SERVER_Client *client,
364 const struct GNUNET_MessageHeader *m) 352 const struct GNUNET_MessageHeader *m)
365{ 353{
366 struct ListenMessage *msg = (struct ListenMessage *) m; 354 struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m;
367 struct Listener *listener; 355 struct Listener *listener;
368 356
369 if (NULL != get_listener (client)) 357 if (NULL != get_listener (client))
@@ -410,7 +398,7 @@ handle_client_remove (void *cls,
410 switch (set->operation) 398 switch (set->operation)
411 { 399 {
412 case GNUNET_SET_OPERATION_UNION: 400 case GNUNET_SET_OPERATION_UNION:
413 _GSS_union_remove ((struct ElementMessage *) m, set); 401 _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set);
414 case GNUNET_SET_OPERATION_INTERSECTION: 402 case GNUNET_SET_OPERATION_INTERSECTION:
415 /* FIXME: cfuchs */ 403 /* FIXME: cfuchs */
416 break; 404 break;
@@ -423,6 +411,38 @@ handle_client_remove (void *cls,
423} 411}
424 412
425 413
414
415/**
416 * Called when the client wants to reject an operation
417 * request from another peer.
418 *
419 * @param cls unused
420 * @param client client that sent the message
421 * @param m message sent by the client
422 */
423static void
424handle_client_reject (void *cls,
425 struct GNUNET_SERVER_Client *client,
426 const struct GNUNET_MessageHeader *m)
427{
428 struct Incoming *incoming;
429 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) m;
430
431 GNUNET_break (0 == ntohl (msg->request_id));
432
433 incoming = get_incoming (ntohl (msg->accept_reject_id));
434 if (NULL == incoming)
435 {
436 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
437 return;
438 }
439 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
440 destroy_incoming (incoming);
441 GNUNET_SERVER_receive_done (client, GNUNET_OK);
442}
443
444
445
426/** 446/**
427 * Called when a client wants to add an element to a 447 * Called when a client wants to add an element to a
428 * set it inhabits. 448 * set it inhabits.
@@ -448,7 +468,7 @@ handle_client_add (void *cls,
448 switch (set->operation) 468 switch (set->operation)
449 { 469 {
450 case GNUNET_SET_OPERATION_UNION: 470 case GNUNET_SET_OPERATION_UNION:
451 _GSS_union_add ((struct ElementMessage *) m, set); 471 _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set);
452 case GNUNET_SET_OPERATION_INTERSECTION: 472 case GNUNET_SET_OPERATION_INTERSECTION:
453 /* FIXME: cfuchs */ 473 /* FIXME: cfuchs */
454 break; 474 break;
@@ -490,7 +510,7 @@ handle_client_evaluate (void *cls,
490 /* FIXME: cfuchs */ 510 /* FIXME: cfuchs */
491 break; 511 break;
492 case GNUNET_SET_OPERATION_UNION: 512 case GNUNET_SET_OPERATION_UNION:
493 _GSS_union_evaluate ((struct EvaluateMessage *) m, set); 513 _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set);
494 break; 514 break;
495 default: 515 default:
496 GNUNET_assert (0); 516 GNUNET_assert (0);
@@ -502,23 +522,6 @@ handle_client_evaluate (void *cls,
502 522
503 523
504/** 524/**
505 * Handle a cancel request from a client.
506 *
507 * @param cls unused
508 * @param client the client
509 * @param m the cancel message
510 */
511static void
512handle_client_cancel (void *cls,
513 struct GNUNET_SERVER_Client *client,
514 const struct GNUNET_MessageHeader *m)
515{
516 /* FIXME: implement */
517 GNUNET_SERVER_receive_done (client, GNUNET_OK);
518}
519
520
521/**
522 * Handle an ack from a client. 525 * Handle an ack from a client.
523 * 526 *
524 * @param cls unused 527 * @param cls unused
@@ -550,25 +553,20 @@ handle_client_accept (void *cls,
550{ 553{
551 struct Set *set; 554 struct Set *set;
552 struct Incoming *incoming; 555 struct Incoming *incoming;
553 struct AcceptMessage *msg = (struct AcceptMessage *) mh; 556 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh;
554 557
558 incoming = get_incoming (ntohl (msg->accept_reject_id));
555 559
556 incoming = get_incoming (ntohl (msg->accept_id)); 560 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl (msg->accept_reject_id));
557 561
558 if (NULL == incoming) 562 if (NULL == incoming)
559 { 563 {
564
560 GNUNET_break (0); 565 GNUNET_break (0);
561 GNUNET_SERVER_client_disconnect (client); 566 GNUNET_SERVER_client_disconnect (client);
562 return; 567 return;
563 } 568 }
564 569
565 if (0 == ntohl (msg->request_id))
566 {
567 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
568 destroy_incoming (incoming);
569 GNUNET_SERVER_receive_done (client, GNUNET_OK);
570 return;
571 }
572 570
573 set = get_set (client); 571 set = get_set (client);
574 572
@@ -687,14 +685,14 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
687 const struct GNUNET_CONFIGURATION_Handle *cfg) 685 const struct GNUNET_CONFIGURATION_Handle *cfg)
688{ 686{
689 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 687 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
688 {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
689 {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
690 {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
690 {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, 691 {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
692 {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
691 {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0}, 693 {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
692 {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, 694 {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0},
693 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, 695 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
694 {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
695 {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
696 {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
697 {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
698 {NULL, NULL, 0, 0} 696 {NULL, NULL, 0, 0}
699 }; 697 };
700 698
@@ -705,6 +703,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
705 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, 703 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
706 &stream_listen_cb, NULL, 704 &stream_listen_cb, NULL,
707 GNUNET_STREAM_OPTION_END); 705 GNUNET_STREAM_OPTION_END);
706
707 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n");
708} 708}
709 709
710 710
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index bea77416e..15199eba4 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -217,7 +217,7 @@ _GSS_union_set_create (void);
217 * @parem set the set to evaluate the operation with 217 * @parem set the set to evaluate the operation with
218 */ 218 */
219void 219void
220_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set); 220_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set);
221 221
222 222
223/** 223/**
@@ -227,7 +227,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set);
227 * @param set set to add the element to 227 * @param set set to add the element to
228 */ 228 */
229void 229void
230_GSS_union_add (struct ElementMessage *m, struct Set *set); 230_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set);
231 231
232 232
233/** 233/**
@@ -238,7 +238,7 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set);
238 * @param set set to remove the element from 238 * @param set set to remove the element from
239 */ 239 */
240void 240void
241_GSS_union_remove (struct ElementMessage *m, struct Set *set); 241_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set);
242 242
243 243
244/** 244/**
@@ -258,7 +258,7 @@ _GSS_union_set_destroy (struct Set *set);
258 * @param incoming information about the requesting remote peer 258 * @param incoming information about the requesting remote peer
259 */ 259 */
260void 260void
261_GSS_union_accept (struct AcceptMessage *m, struct Set *set, 261_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
262 struct Incoming *incoming); 262 struct Incoming *incoming);
263 263
264 264
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index c651a0381..6d9658ee5 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -245,8 +245,7 @@ struct ElementEntry
245 245
246 246
247/** 247/**
248 * Information about the element used for 248 * Entries in the key-to-element map of the union set.
249 * a specific union operation.
250 */ 249 */
251struct KeyEntry 250struct KeyEntry
252{ 251{
@@ -401,11 +400,14 @@ destroy_key_to_element_iter (void *cls,
401static void 400static void
402destroy_union_operation (struct UnionEvaluateOperation *eo) 401destroy_union_operation (struct UnionEvaluateOperation *eo)
403{ 402{
403 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
404
404 if (NULL != eo->mq) 405 if (NULL != eo->mq)
405 { 406 {
406 GNUNET_MQ_destroy (eo->mq); 407 GNUNET_MQ_destroy (eo->mq);
407 eo->mq = NULL; 408 eo->mq = NULL;
408 } 409 }
410
409 if (NULL != eo->socket) 411 if (NULL != eo->socket)
410 { 412 {
411 GNUNET_STREAM_close (eo->socket); 413 GNUNET_STREAM_close (eo->socket);
@@ -433,12 +435,16 @@ destroy_union_operation (struct UnionEvaluateOperation *eo)
433 eo->key_to_element = NULL; 435 eo->key_to_element = NULL;
434 } 436 }
435 437
436
437 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, 438 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
438 eo->set->state.u->ops_tail, 439 eo->set->state.u->ops_tail,
439 eo); 440 eo);
440 GNUNET_free (eo); 441 GNUNET_free (eo);
441 /* FIXME: free and destroy everything else */ 442
443
444 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
445
446
447 /* FIXME: do a garbage collection of the set generations */
442} 448}
443 449
444 450
@@ -452,7 +458,7 @@ static void
452fail_union_operation (struct UnionEvaluateOperation *eo) 458fail_union_operation (struct UnionEvaluateOperation *eo)
453{ 459{
454 struct GNUNET_MQ_Message *mqm; 460 struct GNUNET_MQ_Message *mqm;
455 struct ResultMessage *msg; 461 struct GNUNET_SET_ResultMessage *msg;
456 462
457 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 463 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
458 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 464 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
@@ -495,20 +501,25 @@ send_operation_request (struct UnionEvaluateOperation *eo)
495 struct GNUNET_MQ_Message *mqm; 501 struct GNUNET_MQ_Message *mqm;
496 struct OperationRequestMessage *msg; 502 struct OperationRequestMessage *msg;
497 503
498 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); 504 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
499 if (NULL != eo->context_msg) 505
500 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) 506 if (NULL == mqm)
501 { 507 {
502 /* the context message is too large */ 508 /* the context message is too large */
503 GNUNET_break (0); 509 GNUNET_break (0);
504 GNUNET_SERVER_client_disconnect (eo->set->client); 510 GNUNET_SERVER_client_disconnect (eo->set->client);
505 GNUNET_MQ_discard (mqm); 511 return;
506 return; 512 }
507 }
508 msg->operation = htons (GNUNET_SET_OPERATION_UNION); 513 msg->operation = htons (GNUNET_SET_OPERATION_UNION);
509 msg->app_id = eo->app_id; 514 msg->app_id = eo->app_id;
510 GNUNET_MQ_send (eo->mq, mqm); 515 GNUNET_MQ_send (eo->mq, mqm);
511 516
517 if (NULL != eo->context_msg)
518 {
519 GNUNET_free (eo->context_msg);
520 eo->context_msg = NULL;
521 }
522
512 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); 523 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
513} 524}
514 525
@@ -537,7 +548,7 @@ insert_element_iterator (void *cls,
537 { 548 {
538 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) 549 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
539 { 550 {
540 new_k->next_colliding = old_k; 551 new_k->next_colliding = old_k->next_colliding;
541 old_k->next_colliding = new_k; 552 old_k->next_colliding = new_k;
542 return GNUNET_NO; 553 return GNUNET_NO;
543 } 554 }
@@ -568,12 +579,11 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
568 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, 579 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
569 (uint32_t) ibf_key.key_val, 580 (uint32_t) ibf_key.key_val,
570 insert_element_iterator, k); 581 insert_element_iterator, k);
582
571 /* was the element inserted into a colliding bucket? */ 583 /* was the element inserted into a colliding bucket? */
572 if (GNUNET_SYSERR == ret) 584 if (GNUNET_SYSERR == ret)
573 {
574 GNUNET_assert (NULL != k->next_colliding);
575 return; 585 return;
576 } 586
577 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, 587 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
578 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 588 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
579} 589}
@@ -781,8 +791,8 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
781 */ 791 */
782static int 792static int
783send_element_iterator (void *cls, 793send_element_iterator (void *cls,
784 uint32_t key, 794 uint32_t key,
785 void *value) 795 void *value)
786{ 796{
787 struct SendElementClosure *sec = cls; 797 struct SendElementClosure *sec = cls;
788 struct IBF_Key ibf_key = sec->ibf_key; 798 struct IBF_Key ibf_key = sec->ibf_key;
@@ -795,15 +805,18 @@ send_element_iterator (void *cls,
795 { 805 {
796 const struct GNUNET_SET_Element *const element = &ke->element->element; 806 const struct GNUNET_SET_Element *const element = &ke->element->element;
797 struct GNUNET_MQ_Message *mqm; 807 struct GNUNET_MQ_Message *mqm;
808 struct GNUNET_MessageHeader *mh;
798 809
799 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); 810 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
800 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); 811 mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
801 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) 812 if (NULL == mqm)
802 { 813 {
814 /* element too large */
803 GNUNET_break (0); 815 GNUNET_break (0);
804 GNUNET_MQ_discard (mqm);
805 continue; 816 continue;
806 } 817 }
818 memcpy (&mh[1], element->data, element->size);
819 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
807 GNUNET_MQ_send (eo->mq, mqm); 820 GNUNET_MQ_send (eo->mq, mqm);
808 ke = ke->next_colliding; 821 ke = ke->next_colliding;
809 } 822 }
@@ -975,34 +988,42 @@ send_client_element (struct UnionEvaluateOperation *eo,
975 struct GNUNET_SET_Element *element) 988 struct GNUNET_SET_Element *element)
976{ 989{
977 struct GNUNET_MQ_Message *mqm; 990 struct GNUNET_MQ_Message *mqm;
978 struct ResultMessage *rm; 991 struct GNUNET_SET_ResultMessage *rm;
979 992
980 GNUNET_assert (0 != eo->request_id); 993 GNUNET_assert (0 != eo->request_id);
981 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 994 mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
982 rm->result_status = htons (GNUNET_SET_STATUS_OK); 995 if (NULL == mqm)
983 rm->request_id = htonl (eo->request_id);
984 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
985 { 996 {
986 GNUNET_MQ_discard (mqm); 997 GNUNET_MQ_discard (mqm);
987 GNUNET_break (0); 998 GNUNET_break (0);
988 return; 999 return;
989 } 1000 }
990 1001 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1002 rm->request_id = htonl (eo->request_id);
1003 memcpy (&rm[1], element->data, element->size);
991 GNUNET_MQ_send (eo->set->client_mq, mqm); 1004 GNUNET_MQ_send (eo->set->client_mq, mqm);
992} 1005}
993 1006
994 1007
995/** 1008/**
996 * Callback used for notifications 1009 * Completion callback for shutdown
997 * 1010 *
998 * @param cls closure 1011 * @param cls the closure from GNUNET_STREAM_shutdown call
1012 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
1013 * SHUT_RDWR)
999 */ 1014 */
1000static void 1015/*
1001client_done_sent_cb (void *cls) 1016static void
1017stream_shutdown_cb (void *cls,
1018 int operation)
1002{ 1019{
1003 //struct UnionEvaluateOperation *eo = cls; 1020 //struct UnionEvaluateOperation *eo = cls;
1004 /* FIXME: destroy eo */ 1021
1022 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
1023
1024 // destroy_union_operation (eo);
1005} 1025}
1026*/
1006 1027
1007 1028
1008/** 1029/**
@@ -1018,16 +1039,15 @@ static void
1018send_client_done_and_destroy (struct UnionEvaluateOperation *eo) 1039send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1019{ 1040{
1020 struct GNUNET_MQ_Message *mqm; 1041 struct GNUNET_MQ_Message *mqm;
1021 struct ResultMessage *rm; 1042 struct GNUNET_SET_ResultMessage *rm;
1022 1043
1023 GNUNET_assert (0 != eo->request_id); 1044 GNUNET_assert (0 != eo->request_id);
1024 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 1045 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1025 rm->request_id = htonl (eo->request_id); 1046 rm->request_id = htonl (eo->request_id);
1026 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1047 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1027 GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
1028 GNUNET_MQ_send (eo->set->client_mq, mqm); 1048 GNUNET_MQ_send (eo->set->client_mq, mqm);
1029 1049
1030 /* FIXME: destroy the eo */ 1050 // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
1031} 1051}
1032 1052
1033 1053
@@ -1199,18 +1219,25 @@ stream_open_cb (void *cls,
1199 * @parem set the set to evaluate the operation with 1219 * @parem set the set to evaluate the operation with
1200 */ 1220 */
1201void 1221void
1202_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) 1222_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
1203{ 1223{
1204 struct UnionEvaluateOperation *eo; 1224 struct UnionEvaluateOperation *eo;
1225 struct GNUNET_MessageHeader *context_msg;
1205 1226
1206 eo = GNUNET_new (struct UnionEvaluateOperation); 1227 eo = GNUNET_new (struct UnionEvaluateOperation);
1207 eo->peer = m->peer; 1228 eo->peer = m->target_peer;
1208 eo->set = set; 1229 eo->set = set;
1209 eo->request_id = htonl (m->request_id); 1230 eo->request_id = htonl (m->request_id);
1210 GNUNET_assert (0 != eo->request_id); 1231 GNUNET_assert (0 != eo->request_id);
1211 eo->se = strata_estimator_dup (set->state.u->se); 1232 eo->se = strata_estimator_dup (set->state.u->se);
1212 eo->salt = ntohs (m->salt); 1233 eo->salt = ntohs (m->salt);
1213 eo->app_id = m->app_id; 1234 eo->app_id = m->app_id;
1235
1236 context_msg = GNUNET_MQ_extract_nested_mh (m);
1237 if (NULL != context_msg)
1238 {
1239 eo->context_msg = GNUNET_copy_message (context_msg);
1240 }
1214 1241
1215 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1242 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1216 "evaluating union operation, (app %s)\n", 1243 "evaluating union operation, (app %s)\n",
@@ -1235,7 +1262,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1235 * @param incoming information about the requesting remote peer 1262 * @param incoming information about the requesting remote peer
1236 */ 1263 */
1237void 1264void
1238_GSS_union_accept (struct AcceptMessage *m, struct Set *set, 1265_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
1239 struct Incoming *incoming) 1266 struct Incoming *incoming)
1240{ 1267{
1241 struct UnionEvaluateOperation *eo; 1268 struct UnionEvaluateOperation *eo;
@@ -1250,7 +1277,6 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1250 GNUNET_assert (0 != ntohl (m->request_id)); 1277 GNUNET_assert (0 != ntohl (m->request_id));
1251 eo->request_id = ntohl (m->request_id); 1278 eo->request_id = ntohl (m->request_id);
1252 eo->se = strata_estimator_dup (set->state.u->se); 1279 eo->se = strata_estimator_dup (set->state.u->se);
1253 eo->set = set; // FIXME: redundant!?
1254 eo->mq = incoming->mq; 1280 eo->mq = incoming->mq;
1255 /* transfer ownership of mq and socket from incoming to eo */ 1281 /* transfer ownership of mq and socket from incoming to eo */
1256 incoming->mq = NULL; 1282 incoming->mq = NULL;
@@ -1299,7 +1325,7 @@ _GSS_union_set_create (void)
1299 * @param set set to add the element to 1325 * @param set set to add the element to
1300 */ 1326 */
1301void 1327void
1302_GSS_union_add (struct ElementMessage *m, struct Set *set) 1328_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1303{ 1329{
1304 struct ElementEntry *ee; 1330 struct ElementEntry *ee;
1305 struct ElementEntry *ee_dup; 1331 struct ElementEntry *ee_dup;
@@ -1357,7 +1383,9 @@ _GSS_union_set_destroy (struct Set *set)
1357 destroy_elements (set->state.u); 1383 destroy_elements (set->state.u);
1358 1384
1359 while (NULL != set->state.u->ops_head) 1385 while (NULL != set->state.u->ops_head)
1386 {
1360 destroy_union_operation (set->state.u->ops_head); 1387 destroy_union_operation (set->state.u->ops_head);
1388 }
1361} 1389}
1362 1390
1363/** 1391/**
@@ -1368,7 +1396,7 @@ _GSS_union_set_destroy (struct Set *set)
1368 * @param set set to remove the element from 1396 * @param set set to remove the element from
1369 */ 1397 */
1370void 1398void
1371_GSS_union_remove (struct ElementMessage *m, struct Set *set) 1399_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1372{ 1400{
1373 struct GNUNET_HashCode hash; 1401 struct GNUNET_HashCode hash;
1374 struct ElementEntry *ee; 1402 struct ElementEntry *ee;
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
index 5f2d1c976..ae84610fc 100644
--- a/src/set/gnunet-set.c
+++ b/src/set/gnunet-set.c
@@ -91,11 +91,12 @@ listen_cb (void *cls,
91 const struct GNUNET_MessageHeader *context_msg, 91 const struct GNUNET_MessageHeader *context_msg,
92 struct GNUNET_SET_Request *request) 92 struct GNUNET_SET_Request *request)
93{ 93{
94 struct GNUNET_SET_OperationHandle *oh;
94 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); 95 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
95 GNUNET_SET_listen_cancel (listen_handle); 96 GNUNET_SET_listen_cancel (listen_handle);
96 97
97 GNUNET_SET_accept (request, set2, 98 oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
98 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); 99 GNUNET_SET_conclude (oh, set2);
99} 100}
100 101
101 102
@@ -107,11 +108,14 @@ listen_cb (void *cls,
107static void 108static void
108start (void *cls) 109start (void *cls)
109{ 110{
111 struct GNUNET_SET_OperationHandle *oh;
112
110 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, 113 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
111 &app_id, listen_cb, NULL); 114 &app_id, listen_cb, NULL);
112 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, 115 oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
113 GNUNET_SET_RESULT_ADDED, 116 GNUNET_SET_RESULT_ADDED,
114 result_cb_set1, NULL); 117 result_cb_set1, NULL);
118 GNUNET_SET_conclude (oh, set1);
115} 119}
116 120
117 121
diff --git a/src/set/set.h b/src/set/set.h
index ad2200de9..7ec3e6cb2 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -29,17 +29,12 @@
29#include "platform.h" 29#include "platform.h"
30#include "gnunet_common.h" 30#include "gnunet_common.h"
31 31
32 32#define GNUNET_SET_ACK_WINDOW 10
33/**
34 * The service sends up to GNUNET_SET_ACK_WINDOW messages per client handle,
35 * the client should send an ack every GNUNET_SET_ACK_WINDOW/2 messages.
36 */
37#define GNUNET_SET_ACK_WINDOW 8
38 33
39 34
40GNUNET_NETWORK_STRUCT_BEGIN 35GNUNET_NETWORK_STRUCT_BEGIN
41 36
42struct SetCreateMessage 37struct GNUNET_SET_CreateMessage
43{ 38{
44 /** 39 /**
45 * Type: GNUNET_MESSAGE_TYPE_SET_CREATE 40 * Type: GNUNET_MESSAGE_TYPE_SET_CREATE
@@ -54,7 +49,7 @@ struct SetCreateMessage
54}; 49};
55 50
56 51
57struct ListenMessage 52struct GNUNET_SET_ListenMessage
58{ 53{
59 /** 54 /**
60 * Type: GNUNET_MESSAGE_TYPE_SET_LISTEN 55 * Type: GNUNET_MESSAGE_TYPE_SET_LISTEN
@@ -74,32 +69,31 @@ struct ListenMessage
74}; 69};
75 70
76 71
77struct AcceptMessage 72struct GNUNET_SET_AcceptRejectMessage
78{ 73{
79 /** 74 /**
80 * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT 75 * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT or
76 * GNUNET_MESSAGE_TYPE_SET_REJECT
81 */ 77 */
82 struct GNUNET_MessageHeader header; 78 struct GNUNET_MessageHeader header;
83 79
84 /** 80 /**
85 * Request id that will be sent along with 81 * ID of the incoming request we want to accept / reject.
86 * results for the accepted operation.
87 * Chosen by the client.
88 * Must be 0 if the request has been rejected.
89 */ 82 */
90 uint32_t request_id GNUNET_PACKED; 83 uint32_t accept_reject_id GNUNET_PACKED;
91 84
92 /** 85 /**
93 * ID of the incoming request we want to accept / reject. 86 * Request ID to identify responses,
87 * must be 0 if we don't accept the request.
94 */ 88 */
95 uint32_t accept_id GNUNET_PACKED; 89 uint32_t request_id GNUNET_PACKED;
96}; 90};
97 91
98 92
99/** 93/**
100 * A request for an operation with another client. 94 * A request for an operation with another client.
101 */ 95 */
102struct RequestMessage 96struct GNUNET_SET_RequestMessage
103{ 97{
104 /** 98 /**
105 * Type: GNUNET_MESSAGE_TYPE_SET_Request. 99 * Type: GNUNET_MESSAGE_TYPE_SET_Request.
@@ -107,21 +101,21 @@ struct RequestMessage
107 struct GNUNET_MessageHeader header; 101 struct GNUNET_MessageHeader header;
108 102
109 /** 103 /**
110 * ID of the request we want to accept, 104 * Identity of the requesting peer.
111 * chosen by the service.
112 */ 105 */
113 uint32_t accept_id GNUNET_PACKED; 106 struct GNUNET_PeerIdentity peer_id;
114 107
115 /** 108 /**
116 * Identity of the requesting peer. 109 * ID of the to identify the request when accepting or
110 * rejecting it.
117 */ 111 */
118 struct GNUNET_PeerIdentity peer_id; 112 uint32_t accept_id GNUNET_PACKED;
119 113
120 /* rest: nested context message */ 114 /* rest: nested context message */
121}; 115};
122 116
123 117
124struct EvaluateMessage 118struct GNUNET_SET_EvaluateMessage
125{ 119{
126 /** 120 /**
127 * Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE 121 * Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE
@@ -136,7 +130,7 @@ struct EvaluateMessage
136 /** 130 /**
137 * Peer to evaluate the operation with 131 * Peer to evaluate the operation with
138 */ 132 */
139 struct GNUNET_PeerIdentity peer; 133 struct GNUNET_PeerIdentity target_peer;
140 134
141 /** 135 /**
142 * Application id 136 * Application id
@@ -157,7 +151,7 @@ struct EvaluateMessage
157}; 151};
158 152
159 153
160struct ResultMessage 154struct GNUNET_SET_ResultMessage
161{ 155{
162 /** 156 /**
163 * Type: GNUNET_MESSAGE_TYPE_SET_RESULT 157 * Type: GNUNET_MESSAGE_TYPE_SET_RESULT
@@ -184,7 +178,7 @@ struct ResultMessage
184}; 178};
185 179
186 180
187struct ElementMessage 181struct GNUNET_SET_ElementMessage
188{ 182{
189 /** 183 /**
190 * Type: GNUNET_MESSAGE_TYPE_SET_ADD or 184 * Type: GNUNET_MESSAGE_TYPE_SET_ADD or
@@ -200,20 +194,6 @@ struct ElementMessage
200}; 194};
201 195
202 196
203struct CancelMessage
204{
205 /**
206 * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL
207 */
208 struct GNUNET_MessageHeader header;
209
210 /**
211 * id we want to cancel result belongs to
212 */
213 uint32_t request_id GNUNET_PACKED;
214};
215
216
217GNUNET_NETWORK_STRUCT_END 197GNUNET_NETWORK_STRUCT_END
218 198
219#endif 199#endif
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 5838680b9..c74933aa0 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -33,6 +33,7 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35 35
36
36/** 37/**
37 * Opaque handle to a set. 38 * Opaque handle to a set.
38 */ 39 */
@@ -52,13 +53,33 @@ struct GNUNET_SET_Request
52 int accepted; 53 int accepted;
53}; 54};
54 55
55
56struct GNUNET_SET_OperationHandle 56struct GNUNET_SET_OperationHandle
57{ 57{
58 GNUNET_SET_ResultIterator result_cb; 58 GNUNET_SET_ResultIterator result_cb;
59 void *result_cls; 59 void *result_cls;
60
61 /**
62 * Local set used for the operation,
63 * NULL if no set has been provided by conclude yet.
64 */
60 struct GNUNET_SET_Handle *set; 65 struct GNUNET_SET_Handle *set;
66
67 /**
68 * Request ID to identify the operation within the set.
69 */
61 uint32_t request_id; 70 uint32_t request_id;
71
72 /**
73 * Message sent to the server on calling conclude,
74 * NULL if conclude has been called.
75 */
76 struct GNUNET_MQ_Message *conclude_mqm;
77
78 /**
79 * Address of the request if in the conclude message,
80 * used to patch the request id into the message when the set is known.
81 */
82 uint32_t *request_id_addr;
62}; 83};
63 84
64 85
@@ -83,18 +104,21 @@ struct GNUNET_SET_ListenHandle
83static void 104static void
84handle_result (void *cls, const struct GNUNET_MessageHeader *mh) 105handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
85{ 106{
86 struct ResultMessage *msg = (struct ResultMessage *) mh; 107 struct GNUNET_SET_ResultMessage *msg = (struct GNUNET_SET_ResultMessage *) mh;
87 struct GNUNET_SET_Handle *set = cls; 108 struct GNUNET_SET_Handle *set = cls;
88 struct GNUNET_SET_OperationHandle *oh; 109 struct GNUNET_SET_OperationHandle *oh;
89 struct GNUNET_SET_Element e; 110 struct GNUNET_SET_Element e;
90 111
112
113 GNUNET_assert (NULL != set);
114 GNUNET_assert (NULL != set->mq);
115
91 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) 116 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
92 { 117 {
93 struct GNUNET_MQ_Message *mqm; 118 struct GNUNET_MQ_Message *mqm;
94 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); 119 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
95 GNUNET_MQ_send (set->mq, mqm); 120 GNUNET_MQ_send (set->mq, mqm);
96 } 121 }
97
98 oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); 122 oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id));
99 GNUNET_assert (NULL != oh); 123 GNUNET_assert (NULL != oh);
100 /* status is not STATUS_OK => there's no attached element, 124 /* status is not STATUS_OK => there's no attached element,
@@ -109,7 +133,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
109 } 133 }
110 134
111 e.data = &msg[1]; 135 e.data = &msg[1];
112 e.size = ntohs (mh->size) - sizeof (struct ResultMessage); 136 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
113 e.type = msg->element_type; 137 e.type = msg->element_type;
114 if (NULL != oh->result_cb) 138 if (NULL != oh->result_cb)
115 oh->result_cb (oh->result_cls, &e, htons (msg->result_status)); 139 oh->result_cb (oh->result_cls, &e, htons (msg->result_status));
@@ -124,28 +148,34 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
124static void 148static void
125handle_request (void *cls, const struct GNUNET_MessageHeader *mh) 149handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
126{ 150{
127 struct RequestMessage *msg = (struct RequestMessage *) mh; 151 struct GNUNET_SET_RequestMessage *msg = (struct GNUNET_SET_RequestMessage *) mh;
128 struct GNUNET_SET_ListenHandle *lh = cls; 152 struct GNUNET_SET_ListenHandle *lh = cls;
129 struct GNUNET_SET_Request *req; 153 struct GNUNET_SET_Request *req;
154 struct GNUNET_MessageHeader *context_msg;
130 155
156 LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n");
131 req = GNUNET_new (struct GNUNET_SET_Request); 157 req = GNUNET_new (struct GNUNET_SET_Request);
132 req->accept_id = ntohl (msg->accept_id); 158 req->accept_id = ntohl (msg->accept_id);
159 context_msg = GNUNET_MQ_extract_nested_mh (msg);
133 /* calling GNUNET_SET_accept in the listen cb will set req->accepted */ 160 /* calling GNUNET_SET_accept in the listen cb will set req->accepted */
134 lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); 161 lh->listen_cb (lh->listen_cls, &msg->peer_id, context_msg, req);
135 162
136 if (GNUNET_NO == req->accepted) 163 if (GNUNET_NO == req->accepted)
137 { 164 {
138 struct GNUNET_MQ_Message *mqm; 165 struct GNUNET_MQ_Message *mqm;
139 struct AcceptMessage *amsg; 166 struct GNUNET_SET_AcceptRejectMessage *amsg;
140 167
141 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); 168 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT);
142 /* no request id, as we refused */ 169 /* no request id, as we refused */
143 amsg->request_id = htonl (0); 170 amsg->request_id = htonl (0);
144 amsg->accept_id = msg->accept_id; 171 amsg->accept_reject_id = msg->accept_id;
145 GNUNET_MQ_send (lh->mq, mqm); 172 GNUNET_MQ_send (lh->mq, mqm);
146 GNUNET_free (req); 173 GNUNET_free (req);
174 LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n");
147 } 175 }
148 176
177 LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n");
178
149 /* the accept-case is handled in GNUNET_SET_accept, 179 /* the accept-case is handled in GNUNET_SET_accept,
150 * as we have the accept message available there */ 180 * as we have the accept message available there */
151} 181}
@@ -168,7 +198,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
168{ 198{
169 struct GNUNET_SET_Handle *set; 199 struct GNUNET_SET_Handle *set;
170 struct GNUNET_MQ_Message *mqm; 200 struct GNUNET_MQ_Message *mqm;
171 struct SetCreateMessage *msg; 201 struct GNUNET_SET_CreateMessage *msg;
172 static const struct GNUNET_MQ_Handler mq_handlers[] = { 202 static const struct GNUNET_MQ_Handler mq_handlers[] = {
173 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, 203 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
174 GNUNET_MQ_HANDLERS_END 204 GNUNET_MQ_HANDLERS_END
@@ -179,6 +209,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
179 LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n"); 209 LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n");
180 GNUNET_assert (NULL != set->client); 210 GNUNET_assert (NULL != set->client);
181 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); 211 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set);
212 GNUNET_assert (NULL != set->mq);
182 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); 213 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
183 msg->operation = htons (op); 214 msg->operation = htons (op);
184 GNUNET_MQ_send (set->mq, mqm); 215 GNUNET_MQ_send (set->mq, mqm);
@@ -204,7 +235,7 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
204 void *cont_cls) 235 void *cont_cls)
205{ 236{
206 struct GNUNET_MQ_Message *mqm; 237 struct GNUNET_MQ_Message *mqm;
207 struct ElementMessage *msg; 238 struct GNUNET_SET_ElementMessage *msg;
208 239
209 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); 240 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
210 msg->element_type = element->type; 241 msg->element_type = element->type;
@@ -232,7 +263,7 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
232 void *cont_cls) 263 void *cont_cls)
233{ 264{
234 struct GNUNET_MQ_Message *mqm; 265 struct GNUNET_MQ_Message *mqm;
235 struct ElementMessage *msg; 266 struct GNUNET_SET_ElementMessage *msg;
236 267
237 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); 268 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE);
238 msg->element_type = element->type; 269 msg->element_type = element->type;
@@ -256,10 +287,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
256 287
257 288
258/** 289/**
259 * Evaluate a set operation with our set and the set of another peer. 290 * Create a set operation for evaluation with another peer.
291 * The evaluation will not start until the client provides
292 * a local set with GNUNET_SET_conclude.
260 * 293 *
261 * @param set set to use
262 * @param salt salt for HKDF (explain more here)
263 * @param other_peer peer with the other set 294 * @param other_peer peer with the other set
264 * @param app_id hash for the application using the set 295 * @param app_id hash for the application using the set
265 * @param context_msg additional information for the request 296 * @param context_msg additional information for the request
@@ -273,8 +304,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
273 * @return a handle to cancel the operation 304 * @return a handle to cancel the operation
274 */ 305 */
275struct GNUNET_SET_OperationHandle * 306struct GNUNET_SET_OperationHandle *
276GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, 307GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
277 const struct GNUNET_PeerIdentity *other_peer,
278 const struct GNUNET_HashCode *app_id, 308 const struct GNUNET_HashCode *app_id,
279 const struct GNUNET_MessageHeader *context_msg, 309 const struct GNUNET_MessageHeader *context_msg,
280 uint16_t salt, 310 uint16_t salt,
@@ -283,24 +313,24 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
283 void *result_cls) 313 void *result_cls)
284{ 314{
285 struct GNUNET_MQ_Message *mqm; 315 struct GNUNET_MQ_Message *mqm;
286 struct EvaluateMessage *msg;
287 struct GNUNET_SET_OperationHandle *oh; 316 struct GNUNET_SET_OperationHandle *oh;
317 struct GNUNET_SET_EvaluateMessage *msg;
288 318
289 oh = GNUNET_new (struct GNUNET_SET_OperationHandle); 319 oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
290 oh->result_cb = result_cb; 320 oh->result_cb = result_cb;
291 oh->result_cls = result_cls; 321 oh->result_cls = result_cls;
292 oh->set = set;
293 322
294 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE); 323 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg);
295 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); 324
296 msg->peer = *other_peer;
297 msg->app_id = *app_id;
298
299 if (NULL != context_msg) 325 if (NULL != context_msg)
300 if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size))) 326 LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n");
301 GNUNET_assert (0); 327
302 328 msg->app_id = *app_id;
303 GNUNET_MQ_send (set->mq, mqm); 329 msg->target_peer = *other_peer;
330 msg->salt = salt;
331 msg->reserved = 0;
332 oh->conclude_mqm = mqm;
333 oh->request_id_addr = &msg->request_id;
304 334
305 return oh; 335 return oh;
306} 336}
@@ -327,7 +357,7 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
327{ 357{
328 struct GNUNET_SET_ListenHandle *lh; 358 struct GNUNET_SET_ListenHandle *lh;
329 struct GNUNET_MQ_Message *mqm; 359 struct GNUNET_MQ_Message *mqm;
330 struct ListenMessage *msg; 360 struct GNUNET_SET_ListenMessage *msg;
331 static const struct GNUNET_MQ_Handler mq_handlers[] = { 361 static const struct GNUNET_MQ_Handler mq_handlers[] = {
332 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, 362 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
333 GNUNET_MQ_HANDLERS_END 363 GNUNET_MQ_HANDLERS_END
@@ -363,10 +393,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
363 393
364 394
365/** 395/**
366 * Accept a request we got via GNUNET_SET_listen. 396 * Accept a request we got via GNUNET_SET_listen. Must be called during
397 * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
398 * afterwards.
399 * Call GNUNET_SET_conclude to provide the local set to use for the operation,
400 * and to begin the exchange with the remote peer.
367 * 401 *
368 * @param request request to accept 402 * @param request request to accept
369 * @param set set used for the requested operation
370 * @param result_mode specified how results will be returned, 403 * @param result_mode specified how results will be returned,
371 * see 'GNUNET_SET_ResultMode'. 404 * see 'GNUNET_SET_ResultMode'.
372 * @param result_cb callback for the results 405 * @param result_cb callback for the results
@@ -375,28 +408,26 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
375 */ 408 */
376struct GNUNET_SET_OperationHandle * 409struct GNUNET_SET_OperationHandle *
377GNUNET_SET_accept (struct GNUNET_SET_Request *request, 410GNUNET_SET_accept (struct GNUNET_SET_Request *request,
378 struct GNUNET_SET_Handle *set,
379 enum GNUNET_SET_ResultMode result_mode, 411 enum GNUNET_SET_ResultMode result_mode,
380 GNUNET_SET_ResultIterator result_cb, 412 GNUNET_SET_ResultIterator result_cb,
381 void *result_cls) 413 void *cls)
382{ 414{
383 struct GNUNET_MQ_Message *mqm; 415 struct GNUNET_MQ_Message *mqm;
384 struct AcceptMessage *msg;
385 struct GNUNET_SET_OperationHandle *oh; 416 struct GNUNET_SET_OperationHandle *oh;
417 struct GNUNET_SET_AcceptRejectMessage *msg;
386 418
387 /* don't accept a request twice! */
388 GNUNET_assert (GNUNET_NO == request->accepted); 419 GNUNET_assert (GNUNET_NO == request->accepted);
389 request->accepted = GNUNET_YES; 420 request->accepted = GNUNET_YES;
390 421
391 oh = GNUNET_new (struct GNUNET_SET_OperationHandle); 422 oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
392 oh->result_cb = result_cb; 423 oh->result_cb = result_cb;
393 oh->result_cls = result_cls; 424 oh->result_cls = cls;
394 oh->set = set;
395 425
396 mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); 426 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
397 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh)); 427 msg->accept_reject_id = htonl (request->accept_id);
398 msg->accept_id = htonl (request->accept_id); 428
399 GNUNET_MQ_send (set->mq, mqm); 429 oh->conclude_mqm = mqm;
430 oh->request_id_addr = &msg->request_id;
400 431
401 return oh; 432 return oh;
402} 433}
@@ -413,10 +444,43 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
413 struct GNUNET_MQ_Message *mqm; 444 struct GNUNET_MQ_Message *mqm;
414 struct GNUNET_SET_OperationHandle *h_assoc; 445 struct GNUNET_SET_OperationHandle *h_assoc;
415 446
416 h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); 447 if (NULL != oh->set)
417 GNUNET_assert (h_assoc == oh); 448 {
418 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); 449 h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
419 GNUNET_MQ_send (oh->set->mq, mqm); 450 GNUNET_assert (h_assoc == oh);
451 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
452 GNUNET_MQ_send (oh->set->mq, mqm);
453 }
454
455 if (NULL != oh->conclude_mqm)
456 GNUNET_MQ_discard (oh->conclude_mqm);
457
420 GNUNET_free (oh); 458 GNUNET_free (oh);
421} 459}
422 460
461
462/**
463 * Conclude the given set operation using the given set.
464 * This function is called once we have fully constructed
465 * the set that we want to use for the operation. At this
466 * time, the P2P protocol can then begin to exchange the
467 * set information and call the result callback with the
468 * result information.
469 *
470 * @param oh handle to the set operation
471 * @param set the set to use for the operation
472 */
473void
474GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
475 struct GNUNET_SET_Handle *set)
476{
477 GNUNET_assert (NULL == oh->set);
478 GNUNET_assert (NULL != oh->conclude_mqm);
479 oh->set = set;
480 oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh);
481 *oh->request_id_addr = htonl (oh->request_id);
482 GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
483 oh->conclude_mqm = NULL;
484 oh->request_id_addr = NULL;
485}
486
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index 34b7a8d2f..7bc26ed7e 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -8,8 +8,8 @@ PORT = 2106
8HOSTNAME = localhost 8HOSTNAME = localhost
9HOME = $SERVICEHOME 9HOME = $SERVICEHOME
10BINARY = gnunet-service-set 10BINARY = gnunet-service-set
11#PREFIX = gdbserver :12345
12#PREFIX = valgrind --leak-check=full 11#PREFIX = valgrind --leak-check=full
12#PREFIX = gdbserver :1234
13ACCEPT_FROM = 127.0.0.1; 13ACCEPT_FROM = 127.0.0.1;
14ACCEPT_FROM6 = ::1; 14ACCEPT_FROM6 = ::1;
15UNIXPATH = /tmp/gnunet-service-set.sock 15UNIXPATH = /tmp/gnunet-service-set.sock
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index bf0d65697..f773cebdf 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -20,7 +20,7 @@
20 20
21/** 21/**
22 * @file set/test_set_api.c 22 * @file set/test_set_api.c
23 * @brief testcase for consensus_api.c 23 * @brief testcase for set_api.c
24 */ 24 */
25#include "platform.h" 25#include "platform.h"
26#include "gnunet_util_lib.h" 26#include "gnunet_util_lib.h"
@@ -89,11 +89,13 @@ listen_cb (void *cls,
89 const struct GNUNET_MessageHeader *context_msg, 89 const struct GNUNET_MessageHeader *context_msg,
90 struct GNUNET_SET_Request *request) 90 struct GNUNET_SET_Request *request)
91{ 91{
92 struct GNUNET_SET_OperationHandle *oh;
93
92 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); 94 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
93 GNUNET_SET_listen_cancel (listen_handle); 95 GNUNET_SET_listen_cancel (listen_handle);
94 96
95 GNUNET_SET_accept (request, set2, 97 oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
96 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); 98 GNUNET_SET_conclude (oh, set2);
97} 99}
98 100
99 101
@@ -105,11 +107,14 @@ listen_cb (void *cls,
105static void 107static void
106start (void *cls) 108start (void *cls)
107{ 109{
110 struct GNUNET_SET_OperationHandle *oh;
111
108 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, 112 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
109 &app_id, listen_cb, NULL); 113 &app_id, listen_cb, NULL);
110 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, 114 oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
111 GNUNET_SET_RESULT_ADDED, 115 GNUNET_SET_RESULT_ADDED,
112 result_cb_set1, NULL); 116 result_cb_set1, NULL);
117 GNUNET_SET_conclude (oh, set1);
113} 118}
114 119
115 120
@@ -168,12 +173,14 @@ run (void *cls,
168 struct GNUNET_TESTING_Peer *peer) 173 struct GNUNET_TESTING_Peer *peer)
169{ 174{
170 175
171 static const char* app_str = "gnunet-set";
172
173 config = cfg; 176 config = cfg;
177 GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
178 printf ("my id (from CRYPTO): %s\n", GNUNET_h2s (&local_id.hashPubKey));
174 GNUNET_TESTING_peer_get_identity (peer, &local_id); 179 GNUNET_TESTING_peer_get_identity (peer, &local_id);
180 printf ("my id (from TESTING): %s\n", GNUNET_h2s (&local_id.hashPubKey));
175 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 181 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
176 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 182 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
183 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
177 init_set1 (); 184 init_set1 ();
178} 185}
179 186