diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-06-26 10:06:52 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-06-26 10:06:52 +0000 |
commit | 23479fc50d94f0c29cad3b92fe8fc53e358d4025 (patch) | |
tree | 240efe571ce83c65ac27b5d885c6c2a71e61117f /src/set | |
parent | f5a3f1dc90c9949c8c426f2cb2e822603b137dae (diff) | |
download | gnunet-23479fc50d94f0c29cad3b92fe8fc53e358d4025.tar.gz gnunet-23479fc50d94f0c29cad3b92fe8fc53e358d4025.zip |
- fixed tunnel context
- moved logic out of specific operations
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/gnunet-service-set.c | 178 | ||||
-rw-r--r-- | src/set/gnunet-service-set.h | 173 | ||||
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 19 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 231 | ||||
-rw-r--r-- | src/set/set_protocol.h | 30 |
5 files changed, 351 insertions, 280 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index cfc0068ab..a85093bcd 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -28,6 +28,55 @@ | |||
28 | 28 | ||
29 | 29 | ||
30 | /** | 30 | /** |
31 | * Peer that has connected to us, but is not yet evaluating a set operation. | ||
32 | * Once the peer has sent a request, and the client has | ||
33 | * accepted or rejected it, this information will be deleted. | ||
34 | */ | ||
35 | struct Incoming | ||
36 | { | ||
37 | /** | ||
38 | * Incoming peers are held in a linked list | ||
39 | */ | ||
40 | struct Incoming *next; | ||
41 | |||
42 | /** | ||
43 | * Incoming peers are held in a linked list | ||
44 | */ | ||
45 | struct Incoming *prev; | ||
46 | |||
47 | /** | ||
48 | * Detail information about the operation. | ||
49 | */ | ||
50 | struct OperationSpecification *spec; | ||
51 | |||
52 | /** | ||
53 | * The identity of the requesting peer. | ||
54 | */ | ||
55 | struct GNUNET_PeerIdentity peer; | ||
56 | |||
57 | /** | ||
58 | * Tunnel to the peer. | ||
59 | */ | ||
60 | struct GNUNET_MESH_Tunnel *tunnel; | ||
61 | |||
62 | /** | ||
63 | * Unique request id for the request from | ||
64 | * a remote peer, sent to the client, which will | ||
65 | * accept or reject the request. | ||
66 | * Set to '0' iff the request has not been | ||
67 | * suggested yet. | ||
68 | */ | ||
69 | uint32_t suggest_id; | ||
70 | |||
71 | /** | ||
72 | * Timeout task, if the incoming peer has not been accepted | ||
73 | * after the timeout, it will be disconnected. | ||
74 | */ | ||
75 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
76 | }; | ||
77 | |||
78 | |||
79 | /** | ||
31 | * Configuration of our local peer. | 80 | * Configuration of our local peer. |
32 | * (Not declared 'static' as also needed in gnunet-service-set_union.c) | 81 | * (Not declared 'static' as also needed in gnunet-service-set_union.c) |
33 | */ | 82 | */ |
@@ -77,7 +126,7 @@ static struct Incoming *incoming_tail; | |||
77 | * used to identify incoming operation requests from remote peers, | 126 | * used to identify incoming operation requests from remote peers, |
78 | * that the client can choose to accept or refuse. | 127 | * that the client can choose to accept or refuse. |
79 | */ | 128 | */ |
80 | static uint32_t accept_id = 1; | 129 | static uint32_t suggest_id = 1; |
81 | 130 | ||
82 | 131 | ||
83 | /** | 132 | /** |
@@ -131,7 +180,7 @@ get_incoming (uint32_t id) | |||
131 | struct Incoming *incoming; | 180 | struct Incoming *incoming; |
132 | 181 | ||
133 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) | 182 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) |
134 | if (incoming->accept_id == id) | 183 | if (incoming->suggest_id == id) |
135 | return incoming; | 184 | return incoming; |
136 | return NULL; | 185 | return NULL; |
137 | } | 186 | } |
@@ -145,6 +194,14 @@ get_incoming (uint32_t id) | |||
145 | static void | 194 | static void |
146 | listener_destroy (struct Listener *listener) | 195 | listener_destroy (struct Listener *listener) |
147 | { | 196 | { |
197 | /* If the client is not dead yet, destroy it. | ||
198 | * The client's destroy callback will destroy the listener again. */ | ||
199 | if (NULL != listener->client) | ||
200 | { | ||
201 | GNUNET_SERVER_client_disconnect (listener->client); | ||
202 | listener->client = NULL; | ||
203 | return; | ||
204 | } | ||
148 | if (NULL != listener->client_mq) | 205 | if (NULL != listener->client_mq) |
149 | { | 206 | { |
150 | GNUNET_MQ_destroy (listener->client_mq); | 207 | GNUNET_MQ_destroy (listener->client_mq); |
@@ -163,6 +220,14 @@ listener_destroy (struct Listener *listener) | |||
163 | static void | 220 | static void |
164 | set_destroy (struct Set *set) | 221 | set_destroy (struct Set *set) |
165 | { | 222 | { |
223 | /* If the client is not dead yet, destroy it. | ||
224 | * The client's destroy callback will destroy the set again. */ | ||
225 | if (NULL != set->client) | ||
226 | { | ||
227 | GNUNET_SERVER_client_disconnect (set->client); | ||
228 | set->client = NULL; | ||
229 | return; | ||
230 | } | ||
166 | switch (set->operation) | 231 | switch (set->operation) |
167 | { | 232 | { |
168 | case GNUNET_SET_OPERATION_INTERSECTION: | 233 | case GNUNET_SET_OPERATION_INTERSECTION: |
@@ -195,10 +260,16 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
195 | 260 | ||
196 | set = set_get (client); | 261 | set = set_get (client); |
197 | if (NULL != set) | 262 | if (NULL != set) |
263 | { | ||
264 | set->client = NULL; | ||
198 | set_destroy (set); | 265 | set_destroy (set); |
266 | } | ||
199 | listener = listener_get (client); | 267 | listener = listener_get (client); |
200 | if (NULL != listener) | 268 | if (NULL != listener) |
269 | { | ||
270 | listener->client = NULL; | ||
201 | listener_destroy (listener); | 271 | listener_destroy (listener); |
272 | } | ||
202 | } | 273 | } |
203 | 274 | ||
204 | 275 | ||
@@ -210,6 +281,13 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
210 | static void | 281 | static void |
211 | incoming_destroy (struct Incoming *incoming) | 282 | incoming_destroy (struct Incoming *incoming) |
212 | { | 283 | { |
284 | if (NULL != incoming->tunnel) | ||
285 | { | ||
286 | struct GNUNET_MESH_Tunnel *t = incoming->tunnel; | ||
287 | incoming->tunnel = NULL; | ||
288 | GNUNET_MESH_tunnel_destroy (t); | ||
289 | return; | ||
290 | } | ||
213 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); | 291 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); |
214 | GNUNET_free (incoming); | 292 | GNUNET_free (incoming); |
215 | } | 293 | } |
@@ -246,16 +324,17 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener) | |||
246 | struct GNUNET_MQ_Envelope *mqm; | 324 | struct GNUNET_MQ_Envelope *mqm; |
247 | struct GNUNET_SET_RequestMessage *cmsg; | 325 | struct GNUNET_SET_RequestMessage *cmsg; |
248 | 326 | ||
249 | GNUNET_assert (GNUNET_NO == incoming->suggested); | 327 | GNUNET_assert (0 == incoming->suggest_id); |
250 | incoming->suggested = GNUNET_YES; | 328 | GNUNET_assert (NULL != incoming->spec); |
329 | incoming->suggest_id = suggest_id++; | ||
251 | 330 | ||
252 | GNUNET_SCHEDULER_cancel (incoming->timeout_task); | 331 | GNUNET_SCHEDULER_cancel (incoming->timeout_task); |
253 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, | 332 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, |
254 | incoming->context_msg); | 333 | incoming->spec->context_msg); |
255 | GNUNET_assert (NULL != mqm); | 334 | GNUNET_assert (NULL != mqm); |
256 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id %u\n", incoming->accept_id); | 335 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id %u\n", incoming->suggest_id); |
257 | cmsg->accept_id = htonl (incoming->accept_id); | 336 | cmsg->accept_id = htonl (incoming->suggest_id); |
258 | cmsg->peer_id = incoming->tc->peer; | 337 | cmsg->peer_id = incoming->spec->peer; |
259 | GNUNET_MQ_send (listener->client_mq, mqm); | 338 | GNUNET_MQ_send (listener->client_mq, mqm); |
260 | 339 | ||
261 | } | 340 | } |
@@ -280,6 +359,7 @@ handle_p2p_operation_request (void *cls, | |||
280 | struct Incoming *incoming; | 359 | struct Incoming *incoming; |
281 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; | 360 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; |
282 | struct Listener *listener; | 361 | struct Listener *listener; |
362 | struct OperationSpecification *spec; | ||
283 | 363 | ||
284 | if (CONTEXT_INCOMING != tc->type) | 364 | if (CONTEXT_INCOMING != tc->type) |
285 | { | 365 | { |
@@ -289,21 +369,27 @@ handle_p2p_operation_request (void *cls, | |||
289 | return GNUNET_SYSERR; | 369 | return GNUNET_SYSERR; |
290 | } | 370 | } |
291 | 371 | ||
292 | incoming = tc->data; | 372 | incoming = tc->data.incoming; |
293 | 373 | ||
294 | if (GNUNET_YES == incoming->received_request) | 374 | if (NULL != incoming->spec) |
295 | { | 375 | { |
296 | /* double operation request */ | 376 | /* double operation request */ |
297 | GNUNET_break_op (0); | 377 | GNUNET_break_op (0); |
298 | return GNUNET_SYSERR; | 378 | return GNUNET_SYSERR; |
299 | } | 379 | } |
300 | 380 | ||
301 | incoming->accept_id = accept_id++; | 381 | spec = GNUNET_new (struct OperationSpecification); |
302 | incoming->context_msg = | 382 | spec->context_msg = |
303 | GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg)); | 383 | GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg)); |
384 | spec->operation = ntohl (msg->operation); | ||
385 | spec->app_id = msg->app_id; | ||
386 | spec->salt = ntohl (msg->salt); | ||
387 | spec->peer = incoming->peer; | ||
388 | |||
389 | incoming->spec = spec; | ||
304 | 390 | ||
305 | if ( (NULL != incoming->context_msg) && | 391 | if ( (NULL != spec->context_msg) && |
306 | (ntohs (incoming->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) | 392 | (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) |
307 | { | 393 | { |
308 | GNUNET_break_op (0); | 394 | GNUNET_break_op (0); |
309 | return GNUNET_SYSERR; | 395 | return GNUNET_SYSERR; |
@@ -405,12 +491,12 @@ handle_client_listen (void *cls, | |||
405 | listener->operation, GNUNET_h2s (&listener->app_id)); | 491 | listener->operation, GNUNET_h2s (&listener->app_id)); |
406 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) | 492 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) |
407 | { | 493 | { |
408 | if ( (GNUNET_NO == incoming->received_request) || | 494 | if ( (NULL == incoming->spec) || |
409 | (GNUNET_YES == incoming->suggested) ) | 495 | (0 != incoming->suggest_id) ) |
410 | continue; | 496 | continue; |
411 | if (listener->operation != incoming->operation) | 497 | if (listener->operation != incoming->spec->operation) |
412 | continue; | 498 | continue; |
413 | if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->app_id)) | 499 | if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id)) |
414 | continue; | 500 | continue; |
415 | incoming_suggest (incoming, listener); | 501 | incoming_suggest (incoming, listener); |
416 | } | 502 | } |
@@ -483,8 +569,7 @@ handle_client_reject (void *cls, | |||
483 | return; | 569 | return; |
484 | } | 570 | } |
485 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); | 571 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); |
486 | /* the incoming peer will be destroyed in the tunnel end handler */ | 572 | GNUNET_MESH_tunnel_destroy (incoming->tunnel); |
487 | GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel); | ||
488 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 573 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
489 | } | 574 | } |
490 | 575 | ||
@@ -542,6 +627,10 @@ handle_client_evaluate (void *cls, | |||
542 | const struct GNUNET_MessageHeader *m) | 627 | const struct GNUNET_MessageHeader *m) |
543 | { | 628 | { |
544 | struct Set *set; | 629 | struct Set *set; |
630 | struct TunnelContext *tc; | ||
631 | struct GNUNET_MESH_Tunnel *tunnel; | ||
632 | struct GNUNET_SET_EvaluateMessage *msg; | ||
633 | struct OperationSpecification *spec; | ||
545 | 634 | ||
546 | set = set_get (client); | 635 | set = set_get (client); |
547 | if (NULL == set) | 636 | if (NULL == set) |
@@ -551,13 +640,27 @@ handle_client_evaluate (void *cls, | |||
551 | return; | 640 | return; |
552 | } | 641 | } |
553 | 642 | ||
643 | msg = (struct GNUNET_SET_EvaluateMessage *) m; | ||
644 | tc = GNUNET_new (struct TunnelContext); | ||
645 | spec = GNUNET_new (struct OperationSpecification); | ||
646 | spec->operation = set->operation; | ||
647 | spec->app_id = msg->app_id; | ||
648 | spec->salt = ntohl (msg->salt); | ||
649 | spec->peer = msg->target_peer; | ||
650 | |||
651 | tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer, | ||
652 | GNUNET_APPLICATION_TYPE_SET); | ||
653 | |||
554 | switch (set->operation) | 654 | switch (set->operation) |
555 | { | 655 | { |
556 | case GNUNET_SET_OPERATION_INTERSECTION: | 656 | case GNUNET_SET_OPERATION_INTERSECTION: |
657 | tc->type = CONTEXT_OPERATION_INTERSECTION; | ||
557 | //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); | 658 | //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); |
558 | break; | 659 | break; |
559 | case GNUNET_SET_OPERATION_UNION: | 660 | case GNUNET_SET_OPERATION_UNION: |
560 | _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); | 661 | tc->type = CONTEXT_OPERATION_UNION; |
662 | tc->data.union_op = | ||
663 | _GSS_union_evaluate (spec, tunnel); | ||
561 | break; | 664 | break; |
562 | default: | 665 | default: |
563 | GNUNET_assert (0); | 666 | GNUNET_assert (0); |
@@ -601,6 +704,9 @@ handle_client_accept (void *cls, | |||
601 | struct Set *set; | 704 | struct Set *set; |
602 | struct Incoming *incoming; | 705 | struct Incoming *incoming; |
603 | struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; | 706 | struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; |
707 | struct GNUNET_MESH_Tunnel *tunnel; | ||
708 | struct TunnelContext *tc; | ||
709 | struct OperationSpecification *spec; | ||
604 | 710 | ||
605 | incoming = get_incoming (ntohl (msg->accept_reject_id)); | 711 | incoming = get_incoming (ntohl (msg->accept_reject_id)); |
606 | 712 | ||
@@ -623,13 +729,20 @@ handle_client_accept (void *cls, | |||
623 | return; | 729 | return; |
624 | } | 730 | } |
625 | 731 | ||
732 | tc = GNUNET_new (struct TunnelContext); | ||
733 | tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &incoming->spec->peer, | ||
734 | GNUNET_APPLICATION_TYPE_SET); | ||
735 | spec = GNUNET_new (struct OperationSpecification); | ||
736 | |||
626 | switch (set->operation) | 737 | switch (set->operation) |
627 | { | 738 | { |
628 | case GNUNET_SET_OPERATION_INTERSECTION: | 739 | case GNUNET_SET_OPERATION_INTERSECTION: |
740 | tc->type = CONTEXT_OPERATION_INTERSECTION; | ||
629 | // _GSS_intersection_accept (msg, set, incoming); | 741 | // _GSS_intersection_accept (msg, set, incoming); |
630 | break; | 742 | break; |
631 | case GNUNET_SET_OPERATION_UNION: | 743 | case GNUNET_SET_OPERATION_UNION: |
632 | _GSS_union_accept (msg, set, incoming); | 744 | tc->type = CONTEXT_OPERATION_UNION; |
745 | tc->data.union_op = _GSS_union_accept (spec, tunnel); | ||
633 | break; | 746 | break; |
634 | default: | 747 | default: |
635 | GNUNET_assert (0); | 748 | GNUNET_assert (0); |
@@ -719,11 +832,9 @@ tunnel_new_cb (void *cls, | |||
719 | GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); | 832 | GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); |
720 | tc = GNUNET_new (struct TunnelContext); | 833 | tc = GNUNET_new (struct TunnelContext); |
721 | incoming = GNUNET_new (struct Incoming); | 834 | incoming = GNUNET_new (struct Incoming); |
722 | incoming->tc = tc; | 835 | incoming->peer = *initiator; |
723 | tc->peer = *initiator; | 836 | incoming->tunnel = tunnel; |
724 | tc->tunnel = tunnel; | 837 | tc->data.incoming = incoming; |
725 | tc->mq = GNUNET_MESH_mq_create (tunnel); | ||
726 | tc->data = incoming; | ||
727 | tc->type = CONTEXT_INCOMING; | 838 | tc->type = CONTEXT_INCOMING; |
728 | incoming->timeout_task = | 839 | incoming->timeout_task = |
729 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); | 840 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); |
@@ -750,22 +861,13 @@ tunnel_end_cb (void *cls, | |||
750 | { | 861 | { |
751 | struct TunnelContext *ctx = tunnel_ctx; | 862 | struct TunnelContext *ctx = tunnel_ctx; |
752 | 863 | ||
753 | /* tunnel is dead already */ | ||
754 | ctx->tunnel = NULL; | ||
755 | |||
756 | if (NULL != ctx->mq) | ||
757 | { | ||
758 | GNUNET_MQ_destroy (ctx->mq); | ||
759 | ctx->mq = NULL; | ||
760 | } | ||
761 | |||
762 | switch (ctx->type) | 864 | switch (ctx->type) |
763 | { | 865 | { |
764 | case CONTEXT_INCOMING: | 866 | case CONTEXT_INCOMING: |
765 | incoming_destroy ((struct Incoming *) ctx->data); | 867 | incoming_destroy (ctx->data.incoming); |
766 | break; | 868 | break; |
767 | case CONTEXT_OPERATION_UNION: | 869 | case CONTEXT_OPERATION_UNION: |
768 | _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) ctx->data); | 870 | _GSS_union_operation_destroy (ctx->data.union_op); |
769 | break; | 871 | break; |
770 | case CONTEXT_OPERATION_INTERSECTION: | 872 | case CONTEXT_OPERATION_INTERSECTION: |
771 | GNUNET_assert (0); | 873 | GNUNET_assert (0); |
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 533fd0ef7..574b343d6 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h | |||
@@ -42,14 +42,22 @@ | |||
42 | struct IntersectionState; | 42 | struct IntersectionState; |
43 | 43 | ||
44 | 44 | ||
45 | /* FIXME: cfuchs */ | ||
46 | struct IntersectionOperation; | ||
47 | |||
48 | |||
45 | /** | 49 | /** |
46 | * Extra state required for set union. | 50 | * Extra state required for set union. |
47 | */ | 51 | */ |
48 | struct UnionState; | 52 | struct UnionState; |
49 | 53 | ||
54 | /** | ||
55 | * State of a union operation being evaluated. | ||
56 | */ | ||
50 | struct UnionEvaluateOperation; | 57 | struct UnionEvaluateOperation; |
51 | 58 | ||
52 | 59 | ||
60 | |||
53 | /** | 61 | /** |
54 | * A set that supports a specific operation | 62 | * A set that supports a specific operation |
55 | * with other peers. | 63 | * with other peers. |
@@ -94,6 +102,50 @@ struct Set | |||
94 | 102 | ||
95 | 103 | ||
96 | /** | 104 | /** |
105 | * Detail information about an operation. | ||
106 | */ | ||
107 | struct OperationSpecification | ||
108 | { | ||
109 | /** | ||
110 | * The type of the operation. | ||
111 | */ | ||
112 | enum GNUNET_SET_OperationType operation; | ||
113 | |||
114 | /** | ||
115 | * The remove peer we evaluate the operation with | ||
116 | */ | ||
117 | struct GNUNET_PeerIdentity peer; | ||
118 | |||
119 | /** | ||
120 | * Application ID for the operation, used to distinguish | ||
121 | * multiple operations of the same type with the same peer. | ||
122 | */ | ||
123 | struct GNUNET_HashCode app_id; | ||
124 | |||
125 | /** | ||
126 | * Context message, may be NULL. | ||
127 | */ | ||
128 | struct GNUNET_MessageHeader *context_msg; | ||
129 | |||
130 | /** | ||
131 | * Salt to use for the operation. | ||
132 | */ | ||
133 | uint32_t salt; | ||
134 | |||
135 | /** | ||
136 | * ID used to identify responses to a client. | ||
137 | */ | ||
138 | uint32_t client_request_id; | ||
139 | |||
140 | /** | ||
141 | * Set associated with the operation, NULL until the spec has been associated | ||
142 | * with a set. | ||
143 | */ | ||
144 | struct Set *set; | ||
145 | }; | ||
146 | |||
147 | |||
148 | /** | ||
97 | * A listener is inhabited by a client, and | 149 | * A listener is inhabited by a client, and |
98 | * waits for evaluation requests from remote peers. | 150 | * waits for evaluation requests from remote peers. |
99 | */ | 151 | */ |
@@ -121,12 +173,13 @@ struct Listener | |||
121 | struct GNUNET_MQ_Handle *client_mq; | 173 | struct GNUNET_MQ_Handle *client_mq; |
122 | 174 | ||
123 | /** | 175 | /** |
124 | * Type of operation supported for this set | 176 | * The type of the operation. |
125 | */ | 177 | */ |
126 | enum GNUNET_SET_OperationType operation; | 178 | enum GNUNET_SET_OperationType operation; |
127 | 179 | ||
128 | /** | 180 | /** |
129 | * Application id of intereset for this listener. | 181 | * Application ID for the operation, used to distinguish |
182 | * multiple operations of the same type with the same peer. | ||
130 | */ | 183 | */ |
131 | struct GNUNET_HashCode app_id; | 184 | struct GNUNET_HashCode app_id; |
132 | }; | 185 | }; |
@@ -137,79 +190,51 @@ struct Listener | |||
137 | * Once the peer has sent a request, and the client has | 190 | * Once the peer has sent a request, and the client has |
138 | * accepted or rejected it, this information will be deleted. | 191 | * accepted or rejected it, this information will be deleted. |
139 | */ | 192 | */ |
140 | struct Incoming | 193 | struct Incoming; |
141 | { | ||
142 | /** | ||
143 | * Incoming peers are held in a linked list | ||
144 | */ | ||
145 | struct Incoming *next; | ||
146 | |||
147 | /** | ||
148 | * Incoming peers are held in a linked list | ||
149 | */ | ||
150 | struct Incoming *prev; | ||
151 | 194 | ||
152 | /** | ||
153 | * Tunnel context, stores information about | ||
154 | * the tunnel and its peer. | ||
155 | */ | ||
156 | struct TunnelContext *tc; | ||
157 | |||
158 | /** | ||
159 | * GNUNET_YES if the incoming peer has sent | ||
160 | * an operation request (and we are waiting | ||
161 | * for the client to ack/nack), GNUNET_NO otherwise. | ||
162 | */ | ||
163 | int received_request; | ||
164 | 195 | ||
196 | /** | ||
197 | * Different types a tunnel can be. | ||
198 | */ | ||
199 | enum TunnelContextType { | ||
165 | /** | 200 | /** |
166 | * App code, set once the peer has | 201 | * Tunnel is waiting for a set request from the tunnel, |
167 | * requested an operation | 202 | * or for the ack/nack of the client for a received request. |
168 | */ | 203 | */ |
169 | struct GNUNET_HashCode app_id; | 204 | CONTEXT_INCOMING, |
170 | 205 | ||
171 | /** | 206 | /** |
172 | * Context message, set once the peer | 207 | * The tunnel performs a union operation. |
173 | * has requested an operation. | ||
174 | */ | 208 | */ |
175 | struct GNUNET_MessageHeader *context_msg; | 209 | CONTEXT_OPERATION_UNION, |
176 | 210 | ||
177 | /** | 211 | /** |
178 | * Salt the peer has requested to use for the | 212 | * The tunnel performs an intersection operation. |
179 | * operation | ||
180 | */ | 213 | */ |
181 | uint16_t salt; | 214 | CONTEXT_OPERATION_INTERSECTION, |
215 | }; | ||
182 | 216 | ||
183 | /** | ||
184 | * Operation the other peer wants to do | ||
185 | */ | ||
186 | enum GNUNET_SET_OperationType operation; | ||
187 | 217 | ||
218 | /** | ||
219 | * State associated with the tunnel, dependent on | ||
220 | * tunnel type. | ||
221 | */ | ||
222 | union TunnelContextData | ||
223 | { | ||
188 | /** | 224 | /** |
189 | * Has the incoming request been suggested to | 225 | * Valid for tag 'CONTEXT_INCOMING' |
190 | * a client listener yet? | ||
191 | */ | 226 | */ |
192 | int suggested; | 227 | struct Incoming *incoming; |
193 | 228 | ||
194 | /** | 229 | /** |
195 | * Unique request id for the request from | 230 | * Valid for tag 'CONTEXT_OPERATION_UNION' |
196 | * a remote peer, sent to the client, which will | ||
197 | * accept or reject the request. | ||
198 | */ | 231 | */ |
199 | uint32_t accept_id; | 232 | struct UnionEvaluateOperation *union_op; |
200 | 233 | ||
201 | /** | 234 | /** |
202 | * Timeout task, if the incoming peer has not been accepted | 235 | * Valid for tag 'CONTEXT_OPERATION_INTERSECTION' |
203 | * after the timeout, it will be disconnected. | ||
204 | */ | 236 | */ |
205 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | 237 | struct IntersectionEvaluateOperation *intersection_op; |
206 | }; | ||
207 | |||
208 | |||
209 | enum TunnelContextType { | ||
210 | CONTEXT_INCOMING, | ||
211 | CONTEXT_OPERATION_UNION, | ||
212 | CONTEXT_OPERATION_INTERSECTION, | ||
213 | }; | 238 | }; |
214 | 239 | ||
215 | /** | 240 | /** |
@@ -219,21 +244,6 @@ enum TunnelContextType { | |||
219 | struct TunnelContext | 244 | struct TunnelContext |
220 | { | 245 | { |
221 | /** | 246 | /** |
222 | * The mesh tunnel that has this context | ||
223 | */ | ||
224 | struct GNUNET_MESH_Tunnel *tunnel; | ||
225 | |||
226 | /** | ||
227 | * The peer on the other side. | ||
228 | */ | ||
229 | struct GNUNET_PeerIdentity peer; | ||
230 | |||
231 | /** | ||
232 | * Handle to the message queue for the tunnel. | ||
233 | */ | ||
234 | struct GNUNET_MQ_Handle *mq; | ||
235 | |||
236 | /** | ||
237 | * Type of the tunnel. | 247 | * Type of the tunnel. |
238 | */ | 248 | */ |
239 | enum TunnelContextType type; | 249 | enum TunnelContextType type; |
@@ -242,7 +252,7 @@ struct TunnelContext | |||
242 | * State associated with the tunnel, dependent on | 252 | * State associated with the tunnel, dependent on |
243 | * tunnel type. | 253 | * tunnel type. |
244 | */ | 254 | */ |
245 | void *data; | 255 | union TunnelContextData data; |
246 | }; | 256 | }; |
247 | 257 | ||
248 | 258 | ||
@@ -268,11 +278,14 @@ _GSS_union_set_create (void); | |||
268 | * Evaluate a union operation with | 278 | * Evaluate a union operation with |
269 | * a remote peer. | 279 | * a remote peer. |
270 | * | 280 | * |
271 | * @param m the evaluate request message from the client | 281 | * @param spec specification of the operation the evaluate |
282 | * @param tunnel tunnel already connected to the partner peer | ||
272 | * @param set the set to evaluate the operation with | 283 | * @param set the set to evaluate the operation with |
284 | * @return a handle to the operation | ||
273 | */ | 285 | */ |
274 | void | 286 | struct UnionEvaluateOperation * |
275 | _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set); | 287 | _GSS_union_evaluate (struct OperationSpecification *spec, |
288 | struct GNUNET_MESH_Tunnel *tunnel); | ||
276 | 289 | ||
277 | 290 | ||
278 | /** | 291 | /** |
@@ -308,13 +321,13 @@ _GSS_union_set_destroy (struct Set *set); | |||
308 | /** | 321 | /** |
309 | * Accept an union operation request from a remote peer | 322 | * Accept an union operation request from a remote peer |
310 | * | 323 | * |
311 | * @param m the accept message from the client | 324 | * @param spec all necessary information about the operation |
312 | * @param set the set of the client | 325 | * @param tunnel open tunnel to the partner's peer |
313 | * @param incoming information about the requesting remote peer | 326 | * @return operation |
314 | */ | 327 | */ |
315 | void | 328 | struct UnionEvaluateOperation * |
316 | _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, | 329 | _GSS_union_accept (struct OperationSpecification *spec, |
317 | struct Incoming *incoming); | 330 | struct GNUNET_MESH_Tunnel *tunnel); |
318 | 331 | ||
319 | 332 | ||
320 | /** | 333 | /** |
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index fe3ba56ea..57028c0dd 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -124,10 +124,9 @@ struct IntersectionEvaluateOperation | |||
124 | struct GNUNET_MessageHeader *context_msg; | 124 | struct GNUNET_MessageHeader *context_msg; |
125 | 125 | ||
126 | /** | 126 | /** |
127 | * Tunnel context for the peer we | 127 | * Tunnel to the other peer. |
128 | * evaluate the union operation with. | ||
129 | */ | 128 | */ |
130 | struct TunnelContext *tc; | 129 | struct GNUNET_MESH_Tunnel *tunnel; |
131 | 130 | ||
132 | /** | 131 | /** |
133 | * Request ID to multiplex set operations to | 132 | * Request ID to multiplex set operations to |
@@ -397,12 +396,11 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | |||
397 | { | 396 | { |
398 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); | 397 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); |
399 | 398 | ||
400 | if (NULL != eo->tc) | 399 | if (NULL != eo->tunnel) |
401 | { | 400 | { |
402 | GNUNET_MQ_destroy (eo->tc->mq); | 401 | GNUNET_MESH_tunnel_destroy (eo->tunnel); |
403 | GNUNET_MESH_tunnel_destroy (eo->tc->tunnel); | 402 | /* wait for the final destruction by the tunnel cleaner */ |
404 | GNUNET_free (eo->tc); | 403 | return; |
405 | eo->tc = NULL; | ||
406 | } | 404 | } |
407 | 405 | ||
408 | if (NULL != eo->remote_ibf) | 406 | if (NULL != eo->remote_ibf) |
@@ -432,10 +430,8 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | |||
432 | eo); | 430 | eo); |
433 | GNUNET_free (eo); | 431 | GNUNET_free (eo); |
434 | 432 | ||
435 | |||
436 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); | 433 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); |
437 | 434 | ||
438 | |||
439 | /* FIXME: do a garbage collection of the set generations */ | 435 | /* FIXME: do a garbage collection of the set generations */ |
440 | } | 436 | } |
441 | 437 | ||
@@ -1355,7 +1351,6 @@ _GSS_intersection_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) | |||
1355 | * @param cls closure | 1351 | * @param cls closure |
1356 | * @param tunnel mesh tunnel | 1352 | * @param tunnel mesh tunnel |
1357 | * @param tunnel_ctx tunnel context | 1353 | * @param tunnel_ctx tunnel context |
1358 | * @param sender ??? | ||
1359 | * @param mh message to process | 1354 | * @param mh message to process |
1360 | * @return ??? | 1355 | * @return ??? |
1361 | */ | 1356 | */ |
@@ -1363,7 +1358,6 @@ int | |||
1363 | _GSS_union_handle_p2p_message (void *cls, | 1358 | _GSS_union_handle_p2p_message (void *cls, |
1364 | struct GNUNET_MESH_Tunnel *tunnel, | 1359 | struct GNUNET_MESH_Tunnel *tunnel, |
1365 | void **tunnel_ctx, | 1360 | void **tunnel_ctx, |
1366 | const struct GNUNET_PeerIdentity *sender, | ||
1367 | const struct GNUNET_MessageHeader *mh) | 1361 | const struct GNUNET_MessageHeader *mh) |
1368 | { | 1362 | { |
1369 | struct TunnelContext *tc = *tunnel_ctx; | 1363 | struct TunnelContext *tc = *tunnel_ctx; |
@@ -1371,7 +1365,6 @@ _GSS_union_handle_p2p_message (void *cls, | |||
1371 | 1365 | ||
1372 | if (CONTEXT_OPERATION_UNION != tc->type) | 1366 | if (CONTEXT_OPERATION_UNION != tc->type) |
1373 | { | 1367 | { |
1374 | /* FIXME: kill the tunnel */ | ||
1375 | /* never kill mesh */ | 1368 | /* never kill mesh */ |
1376 | return GNUNET_OK; | 1369 | return GNUNET_OK; |
1377 | } | 1370 | } |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 5b1f28cf4..f9756bd5b 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -103,37 +103,20 @@ enum UnionOperationPhase | |||
103 | struct UnionEvaluateOperation | 103 | struct UnionEvaluateOperation |
104 | { | 104 | { |
105 | /** | 105 | /** |
106 | * Local set the operation is evaluated on. | 106 | * Tunnel to the remote peer. |
107 | */ | 107 | */ |
108 | struct Set *set; | 108 | struct GNUNET_MESH_Tunnel *tunnel; |
109 | |||
110 | /** | ||
111 | * Peer with the remote set | ||
112 | */ | ||
113 | struct GNUNET_PeerIdentity peer; | ||
114 | |||
115 | /** | ||
116 | * Application-specific identifier | ||
117 | */ | ||
118 | struct GNUNET_HashCode app_id; | ||
119 | 109 | ||
120 | /** | 110 | /** |
121 | * Context message, given to us | 111 | * Detail information about the set operation, |
122 | * by the client, may be NULL. | 112 | * including the set to use. |
123 | */ | 113 | */ |
124 | struct GNUNET_MessageHeader *context_msg; | 114 | struct OperationSpecification *spec; |
125 | 115 | ||
126 | /** | 116 | /** |
127 | * Tunnel context for the peer we | 117 | * Message queue for the peer. |
128 | * evaluate the union operation with. | ||
129 | */ | 118 | */ |
130 | struct TunnelContext *tc; | 119 | struct GNUNET_MQ_Handle *mq; |
131 | |||
132 | /** | ||
133 | * Request ID to multiplex set operations to | ||
134 | * the client inhabiting the set. | ||
135 | */ | ||
136 | uint32_t request_id; | ||
137 | 120 | ||
138 | /** | 121 | /** |
139 | * Number of ibf buckets received | 122 | * Number of ibf buckets received |
@@ -167,11 +150,6 @@ struct UnionEvaluateOperation | |||
167 | enum UnionOperationPhase phase; | 150 | enum UnionOperationPhase phase; |
168 | 151 | ||
169 | /** | 152 | /** |
170 | * Salt to use for this operation. | ||
171 | */ | ||
172 | uint16_t salt; | ||
173 | |||
174 | /** | ||
175 | * Generation in which the operation handle | 153 | * Generation in which the operation handle |
176 | * was created. | 154 | * was created. |
177 | */ | 155 | */ |
@@ -395,16 +373,17 @@ destroy_key_to_element_iter (void *cls, | |||
395 | void | 373 | void |
396 | _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | 374 | _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) |
397 | { | 375 | { |
376 | struct UnionState *st = eo->spec->set->state.u; | ||
377 | |||
398 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); | 378 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); |
399 | 379 | ||
400 | if (NULL != eo->tc) | 380 | if (NULL != eo->tunnel) |
401 | { | 381 | { |
402 | GNUNET_MQ_destroy (eo->tc->mq); | 382 | struct GNUNET_MESH_Tunnel *t = eo->tunnel; |
403 | GNUNET_MESH_tunnel_destroy (eo->tc->tunnel); | 383 | eo->tunnel = NULL; |
404 | GNUNET_free (eo->tc); | 384 | GNUNET_MESH_tunnel_destroy (t); |
405 | eo->tc = NULL; | ||
406 | } | 385 | } |
407 | 386 | ||
408 | if (NULL != eo->remote_ibf) | 387 | if (NULL != eo->remote_ibf) |
409 | { | 388 | { |
410 | ibf_destroy (eo->remote_ibf); | 389 | ibf_destroy (eo->remote_ibf); |
@@ -427,8 +406,8 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | |||
427 | eo->key_to_element = NULL; | 406 | eo->key_to_element = NULL; |
428 | } | 407 | } |
429 | 408 | ||
430 | GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, | 409 | GNUNET_CONTAINER_DLL_remove (st->ops_head, |
431 | eo->set->state.u->ops_tail, | 410 | st->ops_tail, |
432 | eo); | 411 | eo); |
433 | GNUNET_free (eo); | 412 | GNUNET_free (eo); |
434 | 413 | ||
@@ -449,13 +428,13 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | |||
449 | static void | 428 | static void |
450 | fail_union_operation (struct UnionEvaluateOperation *eo) | 429 | fail_union_operation (struct UnionEvaluateOperation *eo) |
451 | { | 430 | { |
452 | struct GNUNET_MQ_Envelope *mqm; | 431 | struct GNUNET_MQ_Envelope *ev; |
453 | struct GNUNET_SET_ResultMessage *msg; | 432 | struct GNUNET_SET_ResultMessage *msg; |
454 | 433 | ||
455 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 434 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
456 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 435 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
457 | msg->request_id = htonl (eo->request_id); | 436 | msg->request_id = htonl (eo->spec->client_request_id); |
458 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 437 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); |
459 | _GSS_union_operation_destroy (eo); | 438 | _GSS_union_operation_destroy (eo); |
460 | } | 439 | } |
461 | 440 | ||
@@ -490,27 +469,27 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | |||
490 | static void | 469 | static void |
491 | send_operation_request (struct UnionEvaluateOperation *eo) | 470 | send_operation_request (struct UnionEvaluateOperation *eo) |
492 | { | 471 | { |
493 | struct GNUNET_MQ_Envelope *mqm; | 472 | struct GNUNET_MQ_Envelope *ev; |
494 | struct OperationRequestMessage *msg; | 473 | struct OperationRequestMessage *msg; |
495 | 474 | ||
496 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 475 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
497 | eo->context_msg); | 476 | eo->spec->context_msg); |
498 | 477 | ||
499 | if (NULL == mqm) | 478 | if (NULL == ev) |
500 | { | 479 | { |
501 | /* the context message is too large */ | 480 | /* the context message is too large */ |
502 | GNUNET_break (0); | 481 | GNUNET_break (0); |
503 | GNUNET_SERVER_client_disconnect (eo->set->client); | 482 | GNUNET_SERVER_client_disconnect (eo->spec->set->client); |
504 | return; | 483 | return; |
505 | } | 484 | } |
506 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); | 485 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); |
507 | msg->app_id = eo->app_id; | 486 | msg->app_id = eo->spec->app_id; |
508 | GNUNET_MQ_send (eo->tc->mq, mqm); | 487 | GNUNET_MQ_send (eo->mq, ev); |
509 | 488 | ||
510 | if (NULL != eo->context_msg) | 489 | if (NULL != eo->spec->context_msg) |
511 | { | 490 | { |
512 | GNUNET_free (eo->context_msg); | 491 | GNUNET_free (eo->spec->context_msg); |
513 | eo->context_msg = NULL; | 492 | eo->spec->context_msg = NULL; |
514 | } | 493 | } |
515 | 494 | ||
516 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); | 495 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); |
@@ -565,7 +544,7 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) | |||
565 | struct IBF_Key ibf_key; | 544 | struct IBF_Key ibf_key; |
566 | struct KeyEntry *k; | 545 | struct KeyEntry *k; |
567 | 546 | ||
568 | ibf_key = get_ibf_key (&ee->element_hash, eo->salt); | 547 | ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); |
569 | k = GNUNET_new (struct KeyEntry); | 548 | k = GNUNET_new (struct KeyEntry); |
570 | k->element = ee; | 549 | k->element = ee; |
571 | k->ibf_key = ibf_key; | 550 | k->ibf_key = ibf_key; |
@@ -644,9 +623,9 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) | |||
644 | if (NULL == eo->key_to_element) | 623 | if (NULL == eo->key_to_element) |
645 | { | 624 | { |
646 | unsigned int len; | 625 | unsigned int len; |
647 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); | 626 | len = GNUNET_CONTAINER_multihashmap_size (eo->spec->set->state.u->elements); |
648 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | 627 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); |
649 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, | 628 | GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements, |
650 | init_key_to_element_iterator, eo); | 629 | init_key_to_element_iterator, eo); |
651 | } | 630 | } |
652 | if (NULL != eo->local_ibf) | 631 | if (NULL != eo->local_ibf) |
@@ -678,7 +657,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
678 | while (buckets_sent < (1 << ibf_order)) | 657 | while (buckets_sent < (1 << ibf_order)) |
679 | { | 658 | { |
680 | unsigned int buckets_in_message; | 659 | unsigned int buckets_in_message; |
681 | struct GNUNET_MQ_Envelope *mqm; | 660 | struct GNUNET_MQ_Envelope *ev; |
682 | struct IBFMessage *msg; | 661 | struct IBFMessage *msg; |
683 | 662 | ||
684 | buckets_in_message = (1 << ibf_order) - buckets_sent; | 663 | buckets_in_message = (1 << ibf_order) - buckets_sent; |
@@ -686,14 +665,14 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
686 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) | 665 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) |
687 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | 666 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; |
688 | 667 | ||
689 | mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, | 668 | ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, |
690 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); | 669 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); |
691 | msg->order = ibf_order; | 670 | msg->order = ibf_order; |
692 | msg->offset = htons (buckets_sent); | 671 | msg->offset = htons (buckets_sent); |
693 | ibf_write_slice (ibf, buckets_sent, | 672 | ibf_write_slice (ibf, buckets_sent, |
694 | buckets_in_message, &msg[1]); | 673 | buckets_in_message, &msg[1]); |
695 | buckets_sent += buckets_in_message; | 674 | buckets_sent += buckets_in_message; |
696 | GNUNET_MQ_send (eo->tc->mq, mqm); | 675 | GNUNET_MQ_send (eo->mq, ev); |
697 | } | 676 | } |
698 | 677 | ||
699 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; | 678 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; |
@@ -708,14 +687,15 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
708 | static void | 687 | static void |
709 | send_strata_estimator (struct UnionEvaluateOperation *eo) | 688 | send_strata_estimator (struct UnionEvaluateOperation *eo) |
710 | { | 689 | { |
711 | struct GNUNET_MQ_Envelope *mqm; | 690 | struct GNUNET_MQ_Envelope *ev; |
712 | struct GNUNET_MessageHeader *strata_msg; | 691 | struct GNUNET_MessageHeader *strata_msg; |
692 | struct UnionState *st = eo->spec->set->state.u; | ||
713 | 693 | ||
714 | mqm = GNUNET_MQ_msg_header_extra (strata_msg, | 694 | ev = GNUNET_MQ_msg_header_extra (strata_msg, |
715 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | 695 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, |
716 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | 696 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); |
717 | strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); | 697 | strata_estimator_write (st->se, &strata_msg[1]); |
718 | GNUNET_MQ_send (eo->tc->mq, mqm); | 698 | GNUNET_MQ_send (eo->mq, ev); |
719 | eo->phase = PHASE_EXPECT_IBF; | 699 | eo->phase = PHASE_EXPECT_IBF; |
720 | } | 700 | } |
721 | 701 | ||
@@ -797,12 +777,12 @@ send_element_iterator (void *cls, | |||
797 | while (NULL != ke) | 777 | while (NULL != ke) |
798 | { | 778 | { |
799 | const struct GNUNET_SET_Element *const element = &ke->element->element; | 779 | const struct GNUNET_SET_Element *const element = &ke->element->element; |
800 | struct GNUNET_MQ_Envelope *mqm; | 780 | struct GNUNET_MQ_Envelope *ev; |
801 | struct GNUNET_MessageHeader *mh; | 781 | struct GNUNET_MessageHeader *mh; |
802 | 782 | ||
803 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); | 783 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); |
804 | mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | 784 | ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); |
805 | if (NULL == mqm) | 785 | if (NULL == ev) |
806 | { | 786 | { |
807 | /* element too large */ | 787 | /* element too large */ |
808 | GNUNET_break (0); | 788 | GNUNET_break (0); |
@@ -810,7 +790,7 @@ send_element_iterator (void *cls, | |||
810 | } | 790 | } |
811 | memcpy (&mh[1], element->data, element->size); | 791 | memcpy (&mh[1], element->data, element->size); |
812 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); | 792 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); |
813 | GNUNET_MQ_send (eo->tc->mq, mqm); | 793 | GNUNET_MQ_send (eo->mq, ev); |
814 | ke = ke->next_colliding; | 794 | ke = ke->next_colliding; |
815 | } | 795 | } |
816 | return GNUNET_NO; | 796 | return GNUNET_NO; |
@@ -882,11 +862,11 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
882 | } | 862 | } |
883 | if (GNUNET_NO == res) | 863 | if (GNUNET_NO == res) |
884 | { | 864 | { |
885 | struct GNUNET_MQ_Envelope *mqm; | 865 | struct GNUNET_MQ_Envelope *ev; |
886 | 866 | ||
887 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); | 867 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); |
888 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 868 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
889 | GNUNET_MQ_send (eo->tc->mq, mqm); | 869 | GNUNET_MQ_send (eo->mq, ev); |
890 | break; | 870 | break; |
891 | } | 871 | } |
892 | if (1 == side) | 872 | if (1 == side) |
@@ -895,15 +875,15 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
895 | } | 875 | } |
896 | else | 876 | else |
897 | { | 877 | { |
898 | struct GNUNET_MQ_Envelope *mqm; | 878 | struct GNUNET_MQ_Envelope *ev; |
899 | struct GNUNET_MessageHeader *msg; | 879 | struct GNUNET_MessageHeader *msg; |
900 | 880 | ||
901 | /* FIXME: before sending the request, check if we may just have the element */ | 881 | /* FIXME: before sending the request, check if we may just have the element */ |
902 | /* FIXME: merge multiple requests */ | 882 | /* FIXME: merge multiple requests */ |
903 | mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), | 883 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), |
904 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); | 884 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); |
905 | *(struct IBF_Key *) &msg[1] = key; | 885 | *(struct IBF_Key *) &msg[1] = key; |
906 | GNUNET_MQ_send (eo->tc->mq, mqm); | 886 | GNUNET_MQ_send (eo->mq, ev); |
907 | } | 887 | } |
908 | } | 888 | } |
909 | ibf_destroy (diff_ibf); | 889 | ibf_destroy (diff_ibf); |
@@ -980,21 +960,21 @@ static void | |||
980 | send_client_element (struct UnionEvaluateOperation *eo, | 960 | send_client_element (struct UnionEvaluateOperation *eo, |
981 | struct GNUNET_SET_Element *element) | 961 | struct GNUNET_SET_Element *element) |
982 | { | 962 | { |
983 | struct GNUNET_MQ_Envelope *mqm; | 963 | struct GNUNET_MQ_Envelope *ev; |
984 | struct GNUNET_SET_ResultMessage *rm; | 964 | struct GNUNET_SET_ResultMessage *rm; |
985 | 965 | ||
986 | GNUNET_assert (0 != eo->request_id); | 966 | GNUNET_assert (0 != eo->spec->client_request_id); |
987 | mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | 967 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
988 | if (NULL == mqm) | 968 | if (NULL == ev) |
989 | { | 969 | { |
990 | GNUNET_MQ_discard (mqm); | 970 | GNUNET_MQ_discard (ev); |
991 | GNUNET_break (0); | 971 | GNUNET_break (0); |
992 | return; | 972 | return; |
993 | } | 973 | } |
994 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 974 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
995 | rm->request_id = htonl (eo->request_id); | 975 | rm->request_id = htonl (eo->spec->client_request_id); |
996 | memcpy (&rm[1], element->data, element->size); | 976 | memcpy (&rm[1], element->data, element->size); |
997 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 977 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); |
998 | } | 978 | } |
999 | 979 | ||
1000 | 980 | ||
@@ -1009,14 +989,13 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
1009 | static void | 989 | static void |
1010 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | 990 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) |
1011 | { | 991 | { |
1012 | struct GNUNET_MQ_Envelope *mqm; | 992 | struct GNUNET_MQ_Envelope *ev; |
1013 | struct GNUNET_SET_ResultMessage *rm; | 993 | struct GNUNET_SET_ResultMessage *rm; |
1014 | 994 | ||
1015 | GNUNET_assert (0 != eo->request_id); | 995 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
1016 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 996 | rm->request_id = htonl (eo->spec->client_request_id); |
1017 | rm->request_id = htonl (eo->request_id); | ||
1018 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 997 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1019 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 998 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); |
1020 | 999 | ||
1021 | } | 1000 | } |
1022 | 1001 | ||
@@ -1123,13 +1102,13 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1123 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1102 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
1124 | { | 1103 | { |
1125 | /* we got all requests, but still have to send our elements as response */ | 1104 | /* we got all requests, but still have to send our elements as response */ |
1126 | struct GNUNET_MQ_Envelope *mqm; | 1105 | struct GNUNET_MQ_Envelope *ev; |
1127 | 1106 | ||
1128 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); | 1107 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); |
1129 | eo->phase = PHASE_FINISHED; | 1108 | eo->phase = PHASE_FINISHED; |
1130 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 1109 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
1131 | GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); | 1110 | GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo); |
1132 | GNUNET_MQ_send (eo->tc->mq, mqm); | 1111 | GNUNET_MQ_send (eo->mq, ev); |
1133 | return; | 1112 | return; |
1134 | } | 1113 | } |
1135 | if (eo->phase == PHASE_EXPECT_ELEMENTS) | 1114 | if (eo->phase == PHASE_EXPECT_ELEMENTS) |
@@ -1148,80 +1127,69 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1148 | * Evaluate a union operation with | 1127 | * Evaluate a union operation with |
1149 | * a remote peer. | 1128 | * a remote peer. |
1150 | * | 1129 | * |
1151 | * @param m the evaluate request message from the client | 1130 | * @param spec specification of the operation the evaluate |
1131 | * @param tunnel tunnel already connected to the partner peer | ||
1152 | * @param set the set to evaluate the operation with | 1132 | * @param set the set to evaluate the operation with |
1133 | * @return a handle to the operation | ||
1153 | */ | 1134 | */ |
1154 | void | 1135 | struct UnionEvaluateOperation * |
1155 | _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) | 1136 | _GSS_union_evaluate (struct OperationSpecification *spec, |
1137 | struct GNUNET_MESH_Tunnel *tunnel) | ||
1156 | { | 1138 | { |
1157 | struct UnionEvaluateOperation *eo; | 1139 | struct UnionEvaluateOperation *eo; |
1158 | struct GNUNET_MessageHeader *context_msg; | 1140 | struct UnionState *st = spec->set->state.u; |
1159 | 1141 | ||
1160 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1142 | eo = GNUNET_new (struct UnionEvaluateOperation); |
1161 | eo->peer = m->target_peer; | 1143 | eo->se = strata_estimator_dup (spec->set->state.u->se); |
1162 | eo->set = set; | 1144 | eo->spec = spec; |
1163 | eo->request_id = htonl (m->request_id); | 1145 | eo->tunnel = tunnel; |
1164 | GNUNET_assert (0 != eo->request_id); | ||
1165 | eo->se = strata_estimator_dup (set->state.u->se); | ||
1166 | eo->salt = ntohs (m->salt); | ||
1167 | eo->app_id = m->app_id; | ||
1168 | |||
1169 | context_msg = GNUNET_MQ_extract_nested_mh (m); | ||
1170 | if (NULL != context_msg) | ||
1171 | { | ||
1172 | eo->context_msg = GNUNET_copy_message (context_msg); | ||
1173 | } | ||
1174 | 1146 | ||
1175 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1147 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
1176 | "evaluating union operation, (app %s)\n", | 1148 | "evaluating union operation, (app %s)\n", |
1177 | GNUNET_h2s (&eo->app_id)); | 1149 | GNUNET_h2s (&eo->spec->app_id)); |
1178 | 1150 | ||
1179 | eo->tc = GNUNET_new (struct TunnelContext); | ||
1180 | eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer, | ||
1181 | GNUNET_APPLICATION_TYPE_SET); | ||
1182 | GNUNET_assert (NULL != eo->tc->tunnel); | ||
1183 | eo->tc->peer = eo->peer; | ||
1184 | eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel); | ||
1185 | /* we started the operation, thus we have to send the operation request */ | 1151 | /* we started the operation, thus we have to send the operation request */ |
1186 | eo->phase = PHASE_EXPECT_SE; | 1152 | eo->phase = PHASE_EXPECT_SE; |
1187 | 1153 | ||
1188 | GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, | 1154 | GNUNET_CONTAINER_DLL_insert (st->ops_head, |
1189 | eo->set->state.u->ops_tail, | 1155 | st->ops_tail, |
1190 | eo); | 1156 | eo); |
1191 | 1157 | ||
1192 | send_operation_request (eo); | 1158 | send_operation_request (eo); |
1159 | |||
1160 | return eo; | ||
1193 | } | 1161 | } |
1194 | 1162 | ||
1195 | 1163 | ||
1196 | /** | 1164 | /** |
1197 | * Accept an union operation request from a remote peer | 1165 | * Accept an union operation request from a remote peer |
1198 | * | 1166 | * |
1199 | * @param m the accept message from the client | 1167 | * @param spec all necessary information about the operation |
1200 | * @param set the set of the client | 1168 | * @param tunnel open tunnel to the partner's peer |
1201 | * @param incoming information about the requesting remote peer | 1169 | * @return operation |
1202 | */ | 1170 | */ |
1203 | void | 1171 | struct UnionEvaluateOperation * |
1204 | _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, | 1172 | _GSS_union_accept (struct OperationSpecification *spec, |
1205 | struct Incoming *incoming) | 1173 | struct GNUNET_MESH_Tunnel *tunnel) |
1206 | { | 1174 | { |
1207 | struct UnionEvaluateOperation *eo; | 1175 | struct UnionEvaluateOperation *eo; |
1176 | struct UnionState *st = spec->set->state.u; | ||
1208 | 1177 | ||
1209 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); | 1178 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); |
1210 | 1179 | ||
1211 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1180 | eo = GNUNET_new (struct UnionEvaluateOperation); |
1212 | eo->tc = incoming->tc; | 1181 | eo->generation_created = st->current_generation++; |
1213 | eo->generation_created = set->state.u->current_generation++; | 1182 | eo->spec = spec; |
1214 | eo->set = set; | 1183 | eo->tunnel = tunnel; |
1215 | eo->salt = ntohs (incoming->salt); | 1184 | eo->se = strata_estimator_dup (st->se); |
1216 | GNUNET_assert (0 != ntohl (m->request_id)); | ||
1217 | eo->request_id = ntohl (m->request_id); | ||
1218 | eo->se = strata_estimator_dup (set->state.u->se); | ||
1219 | /* transfer ownership of mq and socket from incoming to eo */ | 1185 | /* transfer ownership of mq and socket from incoming to eo */ |
1220 | GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, | 1186 | GNUNET_CONTAINER_DLL_insert (st->ops_head, |
1221 | eo->set->state.u->ops_tail, | 1187 | st->ops_tail, |
1222 | eo); | 1188 | eo); |
1223 | /* kick off the operation */ | 1189 | /* kick off the operation */ |
1224 | send_strata_estimator (eo); | 1190 | send_strata_estimator (eo); |
1191 | |||
1192 | return eo; | ||
1225 | } | 1193 | } |
1226 | 1194 | ||
1227 | 1195 | ||
@@ -1370,11 +1338,10 @@ _GSS_union_handle_p2p_message (void *cls, | |||
1370 | 1338 | ||
1371 | if (CONTEXT_OPERATION_UNION != tc->type) | 1339 | if (CONTEXT_OPERATION_UNION != tc->type) |
1372 | { | 1340 | { |
1373 | GNUNET_break_op (0); | ||
1374 | return GNUNET_SYSERR; | 1341 | return GNUNET_SYSERR; |
1375 | } | 1342 | } |
1376 | 1343 | ||
1377 | eo = tc->data; | 1344 | eo = tc->data.union_op; |
1378 | 1345 | ||
1379 | switch (ntohs (mh->type)) | 1346 | switch (ntohs (mh->type)) |
1380 | { | 1347 | { |
diff --git a/src/set/set_protocol.h b/src/set/set_protocol.h index 543e2a002..945542151 100644 --- a/src/set/set_protocol.h +++ b/src/set/set_protocol.h | |||
@@ -42,30 +42,21 @@ struct OperationRequestMessage | |||
42 | /** | 42 | /** |
43 | * Operation to request, values from 'enum GNUNET_SET_OperationType' | 43 | * Operation to request, values from 'enum GNUNET_SET_OperationType' |
44 | */ | 44 | */ |
45 | uint32_t operation; | 45 | uint32_t operation GNUNET_PACKED; |
46 | 46 | ||
47 | /** | 47 | /** |
48 | * Application-specific identifier of the request. | 48 | * Salt to use for this operation. |
49 | */ | 49 | */ |
50 | struct GNUNET_HashCode app_id; | 50 | uint32_t salt; |
51 | |||
52 | /* rest: optional message */ | ||
53 | }; | ||
54 | 51 | ||
55 | struct ElementRequestMessage | ||
56 | { | ||
57 | /** | 52 | /** |
58 | * Type: GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS | 53 | * Application-specific identifier of the request. |
59 | */ | 54 | */ |
60 | struct GNUNET_MessageHeader header; | 55 | struct GNUNET_HashCode app_id; |
61 | 56 | ||
62 | /** | 57 | /* rest: optional message */ |
63 | * Salt the keys in the body use | ||
64 | */ | ||
65 | uint8_t salt; | ||
66 | }; | 58 | }; |
67 | 59 | ||
68 | |||
69 | struct IBFMessage | 60 | struct IBFMessage |
70 | { | 61 | { |
71 | /** | 62 | /** |
@@ -80,15 +71,20 @@ struct IBFMessage | |||
80 | uint8_t order; | 71 | uint8_t order; |
81 | 72 | ||
82 | /** | 73 | /** |
83 | * Salt used when hashing elements for this IBF. | 74 | * Padding, must be 0. |
84 | */ | 75 | */ |
85 | uint8_t salt; | 76 | uint8_t reserved; |
86 | 77 | ||
87 | /** | 78 | /** |
88 | * Offset of the strata in the rest of the message | 79 | * Offset of the strata in the rest of the message |
89 | */ | 80 | */ |
90 | uint16_t offset GNUNET_PACKED; | 81 | uint16_t offset GNUNET_PACKED; |
91 | 82 | ||
83 | /** | ||
84 | * Salt used when hashing elements for this IBF. | ||
85 | */ | ||
86 | uint32_t salt; | ||
87 | |||
92 | /* rest: strata */ | 88 | /* rest: strata */ |
93 | }; | 89 | }; |
94 | 90 | ||