aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-26 10:06:52 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-26 10:06:52 +0000
commit23479fc50d94f0c29cad3b92fe8fc53e358d4025 (patch)
tree240efe571ce83c65ac27b5d885c6c2a71e61117f /src/set
parentf5a3f1dc90c9949c8c426f2cb2e822603b137dae (diff)
downloadgnunet-23479fc50d94f0c29cad3b92fe8fc53e358d4025.tar.gz
gnunet-23479fc50d94f0c29cad3b92fe8fc53e358d4025.zip
- fixed tunnel context
- moved logic out of specific operations
Diffstat (limited to 'src/set')
-rw-r--r--src/set/gnunet-service-set.c178
-rw-r--r--src/set/gnunet-service-set.h173
-rw-r--r--src/set/gnunet-service-set_intersection.c19
-rw-r--r--src/set/gnunet-service-set_union.c231
-rw-r--r--src/set/set_protocol.h30
5 files changed, 351 insertions, 280 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index cfc0068ab..a85093bcd 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -28,6 +28,55 @@
28 28
29 29
30/** 30/**
31 * Peer that has connected to us, but is not yet evaluating a set operation.
32 * Once the peer has sent a request, and the client has
33 * accepted or rejected it, this information will be deleted.
34 */
35struct Incoming
36{
37 /**
38 * Incoming peers are held in a linked list
39 */
40 struct Incoming *next;
41
42 /**
43 * Incoming peers are held in a linked list
44 */
45 struct Incoming *prev;
46
47 /**
48 * Detail information about the operation.
49 */
50 struct OperationSpecification *spec;
51
52 /**
53 * The identity of the requesting peer.
54 */
55 struct GNUNET_PeerIdentity peer;
56
57 /**
58 * Tunnel to the peer.
59 */
60 struct GNUNET_MESH_Tunnel *tunnel;
61
62 /**
63 * Unique request id for the request from
64 * a remote peer, sent to the client, which will
65 * accept or reject the request.
66 * Set to '0' iff the request has not been
67 * suggested yet.
68 */
69 uint32_t suggest_id;
70
71 /**
72 * Timeout task, if the incoming peer has not been accepted
73 * after the timeout, it will be disconnected.
74 */
75 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
76};
77
78
79/**
31 * Configuration of our local peer. 80 * Configuration of our local peer.
32 * (Not declared 'static' as also needed in gnunet-service-set_union.c) 81 * (Not declared 'static' as also needed in gnunet-service-set_union.c)
33 */ 82 */
@@ -77,7 +126,7 @@ static struct Incoming *incoming_tail;
77 * used to identify incoming operation requests from remote peers, 126 * used to identify incoming operation requests from remote peers,
78 * that the client can choose to accept or refuse. 127 * that the client can choose to accept or refuse.
79 */ 128 */
80static uint32_t accept_id = 1; 129static uint32_t suggest_id = 1;
81 130
82 131
83/** 132/**
@@ -131,7 +180,7 @@ get_incoming (uint32_t id)
131 struct Incoming *incoming; 180 struct Incoming *incoming;
132 181
133 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) 182 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
134 if (incoming->accept_id == id) 183 if (incoming->suggest_id == id)
135 return incoming; 184 return incoming;
136 return NULL; 185 return NULL;
137} 186}
@@ -145,6 +194,14 @@ get_incoming (uint32_t id)
145static void 194static void
146listener_destroy (struct Listener *listener) 195listener_destroy (struct Listener *listener)
147{ 196{
197 /* If the client is not dead yet, destroy it.
198 * The client's destroy callback will destroy the listener again. */
199 if (NULL != listener->client)
200 {
201 GNUNET_SERVER_client_disconnect (listener->client);
202 listener->client = NULL;
203 return;
204 }
148 if (NULL != listener->client_mq) 205 if (NULL != listener->client_mq)
149 { 206 {
150 GNUNET_MQ_destroy (listener->client_mq); 207 GNUNET_MQ_destroy (listener->client_mq);
@@ -163,6 +220,14 @@ listener_destroy (struct Listener *listener)
163static void 220static void
164set_destroy (struct Set *set) 221set_destroy (struct Set *set)
165{ 222{
223 /* If the client is not dead yet, destroy it.
224 * The client's destroy callback will destroy the set again. */
225 if (NULL != set->client)
226 {
227 GNUNET_SERVER_client_disconnect (set->client);
228 set->client = NULL;
229 return;
230 }
166 switch (set->operation) 231 switch (set->operation)
167 { 232 {
168 case GNUNET_SET_OPERATION_INTERSECTION: 233 case GNUNET_SET_OPERATION_INTERSECTION:
@@ -195,10 +260,16 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
195 260
196 set = set_get (client); 261 set = set_get (client);
197 if (NULL != set) 262 if (NULL != set)
263 {
264 set->client = NULL;
198 set_destroy (set); 265 set_destroy (set);
266 }
199 listener = listener_get (client); 267 listener = listener_get (client);
200 if (NULL != listener) 268 if (NULL != listener)
269 {
270 listener->client = NULL;
201 listener_destroy (listener); 271 listener_destroy (listener);
272 }
202} 273}
203 274
204 275
@@ -210,6 +281,13 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
210static void 281static void
211incoming_destroy (struct Incoming *incoming) 282incoming_destroy (struct Incoming *incoming)
212{ 283{
284 if (NULL != incoming->tunnel)
285 {
286 struct GNUNET_MESH_Tunnel *t = incoming->tunnel;
287 incoming->tunnel = NULL;
288 GNUNET_MESH_tunnel_destroy (t);
289 return;
290 }
213 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); 291 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
214 GNUNET_free (incoming); 292 GNUNET_free (incoming);
215} 293}
@@ -246,16 +324,17 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener)
246 struct GNUNET_MQ_Envelope *mqm; 324 struct GNUNET_MQ_Envelope *mqm;
247 struct GNUNET_SET_RequestMessage *cmsg; 325 struct GNUNET_SET_RequestMessage *cmsg;
248 326
249 GNUNET_assert (GNUNET_NO == incoming->suggested); 327 GNUNET_assert (0 == incoming->suggest_id);
250 incoming->suggested = GNUNET_YES; 328 GNUNET_assert (NULL != incoming->spec);
329 incoming->suggest_id = suggest_id++;
251 330
252 GNUNET_SCHEDULER_cancel (incoming->timeout_task); 331 GNUNET_SCHEDULER_cancel (incoming->timeout_task);
253 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, 332 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST,
254 incoming->context_msg); 333 incoming->spec->context_msg);
255 GNUNET_assert (NULL != mqm); 334 GNUNET_assert (NULL != mqm);
256 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id %u\n", incoming->accept_id); 335 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id %u\n", incoming->suggest_id);
257 cmsg->accept_id = htonl (incoming->accept_id); 336 cmsg->accept_id = htonl (incoming->suggest_id);
258 cmsg->peer_id = incoming->tc->peer; 337 cmsg->peer_id = incoming->spec->peer;
259 GNUNET_MQ_send (listener->client_mq, mqm); 338 GNUNET_MQ_send (listener->client_mq, mqm);
260 339
261} 340}
@@ -280,6 +359,7 @@ handle_p2p_operation_request (void *cls,
280 struct Incoming *incoming; 359 struct Incoming *incoming;
281 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; 360 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
282 struct Listener *listener; 361 struct Listener *listener;
362 struct OperationSpecification *spec;
283 363
284 if (CONTEXT_INCOMING != tc->type) 364 if (CONTEXT_INCOMING != tc->type)
285 { 365 {
@@ -289,21 +369,27 @@ handle_p2p_operation_request (void *cls,
289 return GNUNET_SYSERR; 369 return GNUNET_SYSERR;
290 } 370 }
291 371
292 incoming = tc->data; 372 incoming = tc->data.incoming;
293 373
294 if (GNUNET_YES == incoming->received_request) 374 if (NULL != incoming->spec)
295 { 375 {
296 /* double operation request */ 376 /* double operation request */
297 GNUNET_break_op (0); 377 GNUNET_break_op (0);
298 return GNUNET_SYSERR; 378 return GNUNET_SYSERR;
299 } 379 }
300 380
301 incoming->accept_id = accept_id++; 381 spec = GNUNET_new (struct OperationSpecification);
302 incoming->context_msg = 382 spec->context_msg =
303 GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg)); 383 GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg));
384 spec->operation = ntohl (msg->operation);
385 spec->app_id = msg->app_id;
386 spec->salt = ntohl (msg->salt);
387 spec->peer = incoming->peer;
388
389 incoming->spec = spec;
304 390
305 if ( (NULL != incoming->context_msg) && 391 if ( (NULL != spec->context_msg) &&
306 (ntohs (incoming->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) 392 (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
307 { 393 {
308 GNUNET_break_op (0); 394 GNUNET_break_op (0);
309 return GNUNET_SYSERR; 395 return GNUNET_SYSERR;
@@ -405,12 +491,12 @@ handle_client_listen (void *cls,
405 listener->operation, GNUNET_h2s (&listener->app_id)); 491 listener->operation, GNUNET_h2s (&listener->app_id));
406 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) 492 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
407 { 493 {
408 if ( (GNUNET_NO == incoming->received_request) || 494 if ( (NULL == incoming->spec) ||
409 (GNUNET_YES == incoming->suggested) ) 495 (0 != incoming->suggest_id) )
410 continue; 496 continue;
411 if (listener->operation != incoming->operation) 497 if (listener->operation != incoming->spec->operation)
412 continue; 498 continue;
413 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->app_id)) 499 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id))
414 continue; 500 continue;
415 incoming_suggest (incoming, listener); 501 incoming_suggest (incoming, listener);
416 } 502 }
@@ -483,8 +569,7 @@ handle_client_reject (void *cls,
483 return; 569 return;
484 } 570 }
485 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); 571 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
486 /* the incoming peer will be destroyed in the tunnel end handler */ 572 GNUNET_MESH_tunnel_destroy (incoming->tunnel);
487 GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel);
488 GNUNET_SERVER_receive_done (client, GNUNET_OK); 573 GNUNET_SERVER_receive_done (client, GNUNET_OK);
489} 574}
490 575
@@ -542,6 +627,10 @@ handle_client_evaluate (void *cls,
542 const struct GNUNET_MessageHeader *m) 627 const struct GNUNET_MessageHeader *m)
543{ 628{
544 struct Set *set; 629 struct Set *set;
630 struct TunnelContext *tc;
631 struct GNUNET_MESH_Tunnel *tunnel;
632 struct GNUNET_SET_EvaluateMessage *msg;
633 struct OperationSpecification *spec;
545 634
546 set = set_get (client); 635 set = set_get (client);
547 if (NULL == set) 636 if (NULL == set)
@@ -551,13 +640,27 @@ handle_client_evaluate (void *cls,
551 return; 640 return;
552 } 641 }
553 642
643 msg = (struct GNUNET_SET_EvaluateMessage *) m;
644 tc = GNUNET_new (struct TunnelContext);
645 spec = GNUNET_new (struct OperationSpecification);
646 spec->operation = set->operation;
647 spec->app_id = msg->app_id;
648 spec->salt = ntohl (msg->salt);
649 spec->peer = msg->target_peer;
650
651 tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer,
652 GNUNET_APPLICATION_TYPE_SET);
653
554 switch (set->operation) 654 switch (set->operation)
555 { 655 {
556 case GNUNET_SET_OPERATION_INTERSECTION: 656 case GNUNET_SET_OPERATION_INTERSECTION:
657 tc->type = CONTEXT_OPERATION_INTERSECTION;
557 //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); 658 //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set);
558 break; 659 break;
559 case GNUNET_SET_OPERATION_UNION: 660 case GNUNET_SET_OPERATION_UNION:
560 _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); 661 tc->type = CONTEXT_OPERATION_UNION;
662 tc->data.union_op =
663 _GSS_union_evaluate (spec, tunnel);
561 break; 664 break;
562 default: 665 default:
563 GNUNET_assert (0); 666 GNUNET_assert (0);
@@ -601,6 +704,9 @@ handle_client_accept (void *cls,
601 struct Set *set; 704 struct Set *set;
602 struct Incoming *incoming; 705 struct Incoming *incoming;
603 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; 706 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh;
707 struct GNUNET_MESH_Tunnel *tunnel;
708 struct TunnelContext *tc;
709 struct OperationSpecification *spec;
604 710
605 incoming = get_incoming (ntohl (msg->accept_reject_id)); 711 incoming = get_incoming (ntohl (msg->accept_reject_id));
606 712
@@ -623,13 +729,20 @@ handle_client_accept (void *cls,
623 return; 729 return;
624 } 730 }
625 731
732 tc = GNUNET_new (struct TunnelContext);
733 tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &incoming->spec->peer,
734 GNUNET_APPLICATION_TYPE_SET);
735 spec = GNUNET_new (struct OperationSpecification);
736
626 switch (set->operation) 737 switch (set->operation)
627 { 738 {
628 case GNUNET_SET_OPERATION_INTERSECTION: 739 case GNUNET_SET_OPERATION_INTERSECTION:
740 tc->type = CONTEXT_OPERATION_INTERSECTION;
629 // _GSS_intersection_accept (msg, set, incoming); 741 // _GSS_intersection_accept (msg, set, incoming);
630 break; 742 break;
631 case GNUNET_SET_OPERATION_UNION: 743 case GNUNET_SET_OPERATION_UNION:
632 _GSS_union_accept (msg, set, incoming); 744 tc->type = CONTEXT_OPERATION_UNION;
745 tc->data.union_op = _GSS_union_accept (spec, tunnel);
633 break; 746 break;
634 default: 747 default:
635 GNUNET_assert (0); 748 GNUNET_assert (0);
@@ -719,11 +832,9 @@ tunnel_new_cb (void *cls,
719 GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); 832 GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
720 tc = GNUNET_new (struct TunnelContext); 833 tc = GNUNET_new (struct TunnelContext);
721 incoming = GNUNET_new (struct Incoming); 834 incoming = GNUNET_new (struct Incoming);
722 incoming->tc = tc; 835 incoming->peer = *initiator;
723 tc->peer = *initiator; 836 incoming->tunnel = tunnel;
724 tc->tunnel = tunnel; 837 tc->data.incoming = incoming;
725 tc->mq = GNUNET_MESH_mq_create (tunnel);
726 tc->data = incoming;
727 tc->type = CONTEXT_INCOMING; 838 tc->type = CONTEXT_INCOMING;
728 incoming->timeout_task = 839 incoming->timeout_task =
729 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); 840 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming);
@@ -750,22 +861,13 @@ tunnel_end_cb (void *cls,
750{ 861{
751 struct TunnelContext *ctx = tunnel_ctx; 862 struct TunnelContext *ctx = tunnel_ctx;
752 863
753 /* tunnel is dead already */
754 ctx->tunnel = NULL;
755
756 if (NULL != ctx->mq)
757 {
758 GNUNET_MQ_destroy (ctx->mq);
759 ctx->mq = NULL;
760 }
761
762 switch (ctx->type) 864 switch (ctx->type)
763 { 865 {
764 case CONTEXT_INCOMING: 866 case CONTEXT_INCOMING:
765 incoming_destroy ((struct Incoming *) ctx->data); 867 incoming_destroy (ctx->data.incoming);
766 break; 868 break;
767 case CONTEXT_OPERATION_UNION: 869 case CONTEXT_OPERATION_UNION:
768 _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) ctx->data); 870 _GSS_union_operation_destroy (ctx->data.union_op);
769 break; 871 break;
770 case CONTEXT_OPERATION_INTERSECTION: 872 case CONTEXT_OPERATION_INTERSECTION:
771 GNUNET_assert (0); 873 GNUNET_assert (0);
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 533fd0ef7..574b343d6 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -42,14 +42,22 @@
42struct IntersectionState; 42struct IntersectionState;
43 43
44 44
45/* FIXME: cfuchs */
46struct IntersectionOperation;
47
48
45/** 49/**
46 * Extra state required for set union. 50 * Extra state required for set union.
47 */ 51 */
48struct UnionState; 52struct UnionState;
49 53
54/**
55 * State of a union operation being evaluated.
56 */
50struct UnionEvaluateOperation; 57struct UnionEvaluateOperation;
51 58
52 59
60
53/** 61/**
54 * A set that supports a specific operation 62 * A set that supports a specific operation
55 * with other peers. 63 * with other peers.
@@ -94,6 +102,50 @@ struct Set
94 102
95 103
96/** 104/**
105 * Detail information about an operation.
106 */
107struct OperationSpecification
108{
109 /**
110 * The type of the operation.
111 */
112 enum GNUNET_SET_OperationType operation;
113
114 /**
115 * The remove peer we evaluate the operation with
116 */
117 struct GNUNET_PeerIdentity peer;
118
119 /**
120 * Application ID for the operation, used to distinguish
121 * multiple operations of the same type with the same peer.
122 */
123 struct GNUNET_HashCode app_id;
124
125 /**
126 * Context message, may be NULL.
127 */
128 struct GNUNET_MessageHeader *context_msg;
129
130 /**
131 * Salt to use for the operation.
132 */
133 uint32_t salt;
134
135 /**
136 * ID used to identify responses to a client.
137 */
138 uint32_t client_request_id;
139
140 /**
141 * Set associated with the operation, NULL until the spec has been associated
142 * with a set.
143 */
144 struct Set *set;
145};
146
147
148/**
97 * A listener is inhabited by a client, and 149 * A listener is inhabited by a client, and
98 * waits for evaluation requests from remote peers. 150 * waits for evaluation requests from remote peers.
99 */ 151 */
@@ -121,12 +173,13 @@ struct Listener
121 struct GNUNET_MQ_Handle *client_mq; 173 struct GNUNET_MQ_Handle *client_mq;
122 174
123 /** 175 /**
124 * Type of operation supported for this set 176 * The type of the operation.
125 */ 177 */
126 enum GNUNET_SET_OperationType operation; 178 enum GNUNET_SET_OperationType operation;
127 179
128 /** 180 /**
129 * Application id of intereset for this listener. 181 * Application ID for the operation, used to distinguish
182 * multiple operations of the same type with the same peer.
130 */ 183 */
131 struct GNUNET_HashCode app_id; 184 struct GNUNET_HashCode app_id;
132}; 185};
@@ -137,79 +190,51 @@ struct Listener
137 * Once the peer has sent a request, and the client has 190 * Once the peer has sent a request, and the client has
138 * accepted or rejected it, this information will be deleted. 191 * accepted or rejected it, this information will be deleted.
139 */ 192 */
140struct Incoming 193struct Incoming;
141{
142 /**
143 * Incoming peers are held in a linked list
144 */
145 struct Incoming *next;
146
147 /**
148 * Incoming peers are held in a linked list
149 */
150 struct Incoming *prev;
151 194
152 /**
153 * Tunnel context, stores information about
154 * the tunnel and its peer.
155 */
156 struct TunnelContext *tc;
157
158 /**
159 * GNUNET_YES if the incoming peer has sent
160 * an operation request (and we are waiting
161 * for the client to ack/nack), GNUNET_NO otherwise.
162 */
163 int received_request;
164 195
196/**
197 * Different types a tunnel can be.
198 */
199enum TunnelContextType {
165 /** 200 /**
166 * App code, set once the peer has 201 * Tunnel is waiting for a set request from the tunnel,
167 * requested an operation 202 * or for the ack/nack of the client for a received request.
168 */ 203 */
169 struct GNUNET_HashCode app_id; 204 CONTEXT_INCOMING,
170 205
171 /** 206 /**
172 * Context message, set once the peer 207 * The tunnel performs a union operation.
173 * has requested an operation.
174 */ 208 */
175 struct GNUNET_MessageHeader *context_msg; 209 CONTEXT_OPERATION_UNION,
176 210
177 /** 211 /**
178 * Salt the peer has requested to use for the 212 * The tunnel performs an intersection operation.
179 * operation
180 */ 213 */
181 uint16_t salt; 214 CONTEXT_OPERATION_INTERSECTION,
215};
182 216
183 /**
184 * Operation the other peer wants to do
185 */
186 enum GNUNET_SET_OperationType operation;
187 217
218/**
219 * State associated with the tunnel, dependent on
220 * tunnel type.
221 */
222union TunnelContextData
223{
188 /** 224 /**
189 * Has the incoming request been suggested to 225 * Valid for tag 'CONTEXT_INCOMING'
190 * a client listener yet?
191 */ 226 */
192 int suggested; 227 struct Incoming *incoming;
193 228
194 /** 229 /**
195 * Unique request id for the request from 230 * Valid for tag 'CONTEXT_OPERATION_UNION'
196 * a remote peer, sent to the client, which will
197 * accept or reject the request.
198 */ 231 */
199 uint32_t accept_id; 232 struct UnionEvaluateOperation *union_op;
200 233
201 /** 234 /**
202 * Timeout task, if the incoming peer has not been accepted 235 * Valid for tag 'CONTEXT_OPERATION_INTERSECTION'
203 * after the timeout, it will be disconnected.
204 */ 236 */
205 GNUNET_SCHEDULER_TaskIdentifier timeout_task; 237 struct IntersectionEvaluateOperation *intersection_op;
206};
207
208
209enum TunnelContextType {
210 CONTEXT_INCOMING,
211 CONTEXT_OPERATION_UNION,
212 CONTEXT_OPERATION_INTERSECTION,
213}; 238};
214 239
215/** 240/**
@@ -219,21 +244,6 @@ enum TunnelContextType {
219struct TunnelContext 244struct TunnelContext
220{ 245{
221 /** 246 /**
222 * The mesh tunnel that has this context
223 */
224 struct GNUNET_MESH_Tunnel *tunnel;
225
226 /**
227 * The peer on the other side.
228 */
229 struct GNUNET_PeerIdentity peer;
230
231 /**
232 * Handle to the message queue for the tunnel.
233 */
234 struct GNUNET_MQ_Handle *mq;
235
236 /**
237 * Type of the tunnel. 247 * Type of the tunnel.
238 */ 248 */
239 enum TunnelContextType type; 249 enum TunnelContextType type;
@@ -242,7 +252,7 @@ struct TunnelContext
242 * State associated with the tunnel, dependent on 252 * State associated with the tunnel, dependent on
243 * tunnel type. 253 * tunnel type.
244 */ 254 */
245 void *data; 255 union TunnelContextData data;
246}; 256};
247 257
248 258
@@ -268,11 +278,14 @@ _GSS_union_set_create (void);
268 * Evaluate a union operation with 278 * Evaluate a union operation with
269 * a remote peer. 279 * a remote peer.
270 * 280 *
271 * @param m the evaluate request message from the client 281 * @param spec specification of the operation the evaluate
282 * @param tunnel tunnel already connected to the partner peer
272 * @param set the set to evaluate the operation with 283 * @param set the set to evaluate the operation with
284 * @return a handle to the operation
273 */ 285 */
274void 286struct UnionEvaluateOperation *
275_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set); 287_GSS_union_evaluate (struct OperationSpecification *spec,
288 struct GNUNET_MESH_Tunnel *tunnel);
276 289
277 290
278/** 291/**
@@ -308,13 +321,13 @@ _GSS_union_set_destroy (struct Set *set);
308/** 321/**
309 * Accept an union operation request from a remote peer 322 * Accept an union operation request from a remote peer
310 * 323 *
311 * @param m the accept message from the client 324 * @param spec all necessary information about the operation
312 * @param set the set of the client 325 * @param tunnel open tunnel to the partner's peer
313 * @param incoming information about the requesting remote peer 326 * @return operation
314 */ 327 */
315void 328struct UnionEvaluateOperation *
316_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, 329_GSS_union_accept (struct OperationSpecification *spec,
317 struct Incoming *incoming); 330 struct GNUNET_MESH_Tunnel *tunnel);
318 331
319 332
320/** 333/**
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index fe3ba56ea..57028c0dd 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -124,10 +124,9 @@ struct IntersectionEvaluateOperation
124 struct GNUNET_MessageHeader *context_msg; 124 struct GNUNET_MessageHeader *context_msg;
125 125
126 /** 126 /**
127 * Tunnel context for the peer we 127 * Tunnel to the other peer.
128 * evaluate the union operation with.
129 */ 128 */
130 struct TunnelContext *tc; 129 struct GNUNET_MESH_Tunnel *tunnel;
131 130
132 /** 131 /**
133 * Request ID to multiplex set operations to 132 * Request ID to multiplex set operations to
@@ -397,12 +396,11 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
397{ 396{
398 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); 397 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
399 398
400 if (NULL != eo->tc) 399 if (NULL != eo->tunnel)
401 { 400 {
402 GNUNET_MQ_destroy (eo->tc->mq); 401 GNUNET_MESH_tunnel_destroy (eo->tunnel);
403 GNUNET_MESH_tunnel_destroy (eo->tc->tunnel); 402 /* wait for the final destruction by the tunnel cleaner */
404 GNUNET_free (eo->tc); 403 return;
405 eo->tc = NULL;
406 } 404 }
407 405
408 if (NULL != eo->remote_ibf) 406 if (NULL != eo->remote_ibf)
@@ -432,10 +430,8 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
432 eo); 430 eo);
433 GNUNET_free (eo); 431 GNUNET_free (eo);
434 432
435
436 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); 433 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
437 434
438
439 /* FIXME: do a garbage collection of the set generations */ 435 /* FIXME: do a garbage collection of the set generations */
440} 436}
441 437
@@ -1355,7 +1351,6 @@ _GSS_intersection_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1355 * @param cls closure 1351 * @param cls closure
1356 * @param tunnel mesh tunnel 1352 * @param tunnel mesh tunnel
1357 * @param tunnel_ctx tunnel context 1353 * @param tunnel_ctx tunnel context
1358 * @param sender ???
1359 * @param mh message to process 1354 * @param mh message to process
1360 * @return ??? 1355 * @return ???
1361 */ 1356 */
@@ -1363,7 +1358,6 @@ int
1363_GSS_union_handle_p2p_message (void *cls, 1358_GSS_union_handle_p2p_message (void *cls,
1364 struct GNUNET_MESH_Tunnel *tunnel, 1359 struct GNUNET_MESH_Tunnel *tunnel,
1365 void **tunnel_ctx, 1360 void **tunnel_ctx,
1366 const struct GNUNET_PeerIdentity *sender,
1367 const struct GNUNET_MessageHeader *mh) 1361 const struct GNUNET_MessageHeader *mh)
1368{ 1362{
1369 struct TunnelContext *tc = *tunnel_ctx; 1363 struct TunnelContext *tc = *tunnel_ctx;
@@ -1371,7 +1365,6 @@ _GSS_union_handle_p2p_message (void *cls,
1371 1365
1372 if (CONTEXT_OPERATION_UNION != tc->type) 1366 if (CONTEXT_OPERATION_UNION != tc->type)
1373 { 1367 {
1374 /* FIXME: kill the tunnel */
1375 /* never kill mesh */ 1368 /* never kill mesh */
1376 return GNUNET_OK; 1369 return GNUNET_OK;
1377 } 1370 }
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 5b1f28cf4..f9756bd5b 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -103,37 +103,20 @@ enum UnionOperationPhase
103struct UnionEvaluateOperation 103struct UnionEvaluateOperation
104{ 104{
105 /** 105 /**
106 * Local set the operation is evaluated on. 106 * Tunnel to the remote peer.
107 */ 107 */
108 struct Set *set; 108 struct GNUNET_MESH_Tunnel *tunnel;
109
110 /**
111 * Peer with the remote set
112 */
113 struct GNUNET_PeerIdentity peer;
114
115 /**
116 * Application-specific identifier
117 */
118 struct GNUNET_HashCode app_id;
119 109
120 /** 110 /**
121 * Context message, given to us 111 * Detail information about the set operation,
122 * by the client, may be NULL. 112 * including the set to use.
123 */ 113 */
124 struct GNUNET_MessageHeader *context_msg; 114 struct OperationSpecification *spec;
125 115
126 /** 116 /**
127 * Tunnel context for the peer we 117 * Message queue for the peer.
128 * evaluate the union operation with.
129 */ 118 */
130 struct TunnelContext *tc; 119 struct GNUNET_MQ_Handle *mq;
131
132 /**
133 * Request ID to multiplex set operations to
134 * the client inhabiting the set.
135 */
136 uint32_t request_id;
137 120
138 /** 121 /**
139 * Number of ibf buckets received 122 * Number of ibf buckets received
@@ -167,11 +150,6 @@ struct UnionEvaluateOperation
167 enum UnionOperationPhase phase; 150 enum UnionOperationPhase phase;
168 151
169 /** 152 /**
170 * Salt to use for this operation.
171 */
172 uint16_t salt;
173
174 /**
175 * Generation in which the operation handle 153 * Generation in which the operation handle
176 * was created. 154 * was created.
177 */ 155 */
@@ -395,16 +373,17 @@ destroy_key_to_element_iter (void *cls,
395void 373void
396_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) 374_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
397{ 375{
376 struct UnionState *st = eo->spec->set->state.u;
377
398 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); 378 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
399 379
400 if (NULL != eo->tc) 380 if (NULL != eo->tunnel)
401 { 381 {
402 GNUNET_MQ_destroy (eo->tc->mq); 382 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
403 GNUNET_MESH_tunnel_destroy (eo->tc->tunnel); 383 eo->tunnel = NULL;
404 GNUNET_free (eo->tc); 384 GNUNET_MESH_tunnel_destroy (t);
405 eo->tc = NULL;
406 } 385 }
407 386
408 if (NULL != eo->remote_ibf) 387 if (NULL != eo->remote_ibf)
409 { 388 {
410 ibf_destroy (eo->remote_ibf); 389 ibf_destroy (eo->remote_ibf);
@@ -427,8 +406,8 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
427 eo->key_to_element = NULL; 406 eo->key_to_element = NULL;
428 } 407 }
429 408
430 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, 409 GNUNET_CONTAINER_DLL_remove (st->ops_head,
431 eo->set->state.u->ops_tail, 410 st->ops_tail,
432 eo); 411 eo);
433 GNUNET_free (eo); 412 GNUNET_free (eo);
434 413
@@ -449,13 +428,13 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
449static void 428static void
450fail_union_operation (struct UnionEvaluateOperation *eo) 429fail_union_operation (struct UnionEvaluateOperation *eo)
451{ 430{
452 struct GNUNET_MQ_Envelope *mqm; 431 struct GNUNET_MQ_Envelope *ev;
453 struct GNUNET_SET_ResultMessage *msg; 432 struct GNUNET_SET_ResultMessage *msg;
454 433
455 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 434 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
456 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 435 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
457 msg->request_id = htonl (eo->request_id); 436 msg->request_id = htonl (eo->spec->client_request_id);
458 GNUNET_MQ_send (eo->set->client_mq, mqm); 437 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
459 _GSS_union_operation_destroy (eo); 438 _GSS_union_operation_destroy (eo);
460} 439}
461 440
@@ -490,27 +469,27 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
490static void 469static void
491send_operation_request (struct UnionEvaluateOperation *eo) 470send_operation_request (struct UnionEvaluateOperation *eo)
492{ 471{
493 struct GNUNET_MQ_Envelope *mqm; 472 struct GNUNET_MQ_Envelope *ev;
494 struct OperationRequestMessage *msg; 473 struct OperationRequestMessage *msg;
495 474
496 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 475 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
497 eo->context_msg); 476 eo->spec->context_msg);
498 477
499 if (NULL == mqm) 478 if (NULL == ev)
500 { 479 {
501 /* the context message is too large */ 480 /* the context message is too large */
502 GNUNET_break (0); 481 GNUNET_break (0);
503 GNUNET_SERVER_client_disconnect (eo->set->client); 482 GNUNET_SERVER_client_disconnect (eo->spec->set->client);
504 return; 483 return;
505 } 484 }
506 msg->operation = htons (GNUNET_SET_OPERATION_UNION); 485 msg->operation = htons (GNUNET_SET_OPERATION_UNION);
507 msg->app_id = eo->app_id; 486 msg->app_id = eo->spec->app_id;
508 GNUNET_MQ_send (eo->tc->mq, mqm); 487 GNUNET_MQ_send (eo->mq, ev);
509 488
510 if (NULL != eo->context_msg) 489 if (NULL != eo->spec->context_msg)
511 { 490 {
512 GNUNET_free (eo->context_msg); 491 GNUNET_free (eo->spec->context_msg);
513 eo->context_msg = NULL; 492 eo->spec->context_msg = NULL;
514 } 493 }
515 494
516 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); 495 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
@@ -565,7 +544,7 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
565 struct IBF_Key ibf_key; 544 struct IBF_Key ibf_key;
566 struct KeyEntry *k; 545 struct KeyEntry *k;
567 546
568 ibf_key = get_ibf_key (&ee->element_hash, eo->salt); 547 ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
569 k = GNUNET_new (struct KeyEntry); 548 k = GNUNET_new (struct KeyEntry);
570 k->element = ee; 549 k->element = ee;
571 k->ibf_key = ibf_key; 550 k->ibf_key = ibf_key;
@@ -644,9 +623,9 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
644 if (NULL == eo->key_to_element) 623 if (NULL == eo->key_to_element)
645 { 624 {
646 unsigned int len; 625 unsigned int len;
647 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); 626 len = GNUNET_CONTAINER_multihashmap_size (eo->spec->set->state.u->elements);
648 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 627 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
649 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, 628 GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements,
650 init_key_to_element_iterator, eo); 629 init_key_to_element_iterator, eo);
651 } 630 }
652 if (NULL != eo->local_ibf) 631 if (NULL != eo->local_ibf)
@@ -678,7 +657,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
678 while (buckets_sent < (1 << ibf_order)) 657 while (buckets_sent < (1 << ibf_order))
679 { 658 {
680 unsigned int buckets_in_message; 659 unsigned int buckets_in_message;
681 struct GNUNET_MQ_Envelope *mqm; 660 struct GNUNET_MQ_Envelope *ev;
682 struct IBFMessage *msg; 661 struct IBFMessage *msg;
683 662
684 buckets_in_message = (1 << ibf_order) - buckets_sent; 663 buckets_in_message = (1 << ibf_order) - buckets_sent;
@@ -686,14 +665,14 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
686 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) 665 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
687 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 666 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
688 667
689 mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, 668 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
690 GNUNET_MESSAGE_TYPE_SET_P2P_IBF); 669 GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
691 msg->order = ibf_order; 670 msg->order = ibf_order;
692 msg->offset = htons (buckets_sent); 671 msg->offset = htons (buckets_sent);
693 ibf_write_slice (ibf, buckets_sent, 672 ibf_write_slice (ibf, buckets_sent,
694 buckets_in_message, &msg[1]); 673 buckets_in_message, &msg[1]);
695 buckets_sent += buckets_in_message; 674 buckets_sent += buckets_in_message;
696 GNUNET_MQ_send (eo->tc->mq, mqm); 675 GNUNET_MQ_send (eo->mq, ev);
697 } 676 }
698 677
699 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; 678 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
@@ -708,14 +687,15 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
708static void 687static void
709send_strata_estimator (struct UnionEvaluateOperation *eo) 688send_strata_estimator (struct UnionEvaluateOperation *eo)
710{ 689{
711 struct GNUNET_MQ_Envelope *mqm; 690 struct GNUNET_MQ_Envelope *ev;
712 struct GNUNET_MessageHeader *strata_msg; 691 struct GNUNET_MessageHeader *strata_msg;
692 struct UnionState *st = eo->spec->set->state.u;
713 693
714 mqm = GNUNET_MQ_msg_header_extra (strata_msg, 694 ev = GNUNET_MQ_msg_header_extra (strata_msg,
715 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 695 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
716 GNUNET_MESSAGE_TYPE_SET_P2P_SE); 696 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
717 strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); 697 strata_estimator_write (st->se, &strata_msg[1]);
718 GNUNET_MQ_send (eo->tc->mq, mqm); 698 GNUNET_MQ_send (eo->mq, ev);
719 eo->phase = PHASE_EXPECT_IBF; 699 eo->phase = PHASE_EXPECT_IBF;
720} 700}
721 701
@@ -797,12 +777,12 @@ send_element_iterator (void *cls,
797 while (NULL != ke) 777 while (NULL != ke)
798 { 778 {
799 const struct GNUNET_SET_Element *const element = &ke->element->element; 779 const struct GNUNET_SET_Element *const element = &ke->element->element;
800 struct GNUNET_MQ_Envelope *mqm; 780 struct GNUNET_MQ_Envelope *ev;
801 struct GNUNET_MessageHeader *mh; 781 struct GNUNET_MessageHeader *mh;
802 782
803 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); 783 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
804 mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); 784 ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
805 if (NULL == mqm) 785 if (NULL == ev)
806 { 786 {
807 /* element too large */ 787 /* element too large */
808 GNUNET_break (0); 788 GNUNET_break (0);
@@ -810,7 +790,7 @@ send_element_iterator (void *cls,
810 } 790 }
811 memcpy (&mh[1], element->data, element->size); 791 memcpy (&mh[1], element->data, element->size);
812 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); 792 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
813 GNUNET_MQ_send (eo->tc->mq, mqm); 793 GNUNET_MQ_send (eo->mq, ev);
814 ke = ke->next_colliding; 794 ke = ke->next_colliding;
815 } 795 }
816 return GNUNET_NO; 796 return GNUNET_NO;
@@ -882,11 +862,11 @@ decode_and_send (struct UnionEvaluateOperation *eo)
882 } 862 }
883 if (GNUNET_NO == res) 863 if (GNUNET_NO == res)
884 { 864 {
885 struct GNUNET_MQ_Envelope *mqm; 865 struct GNUNET_MQ_Envelope *ev;
886 866
887 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); 867 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
888 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 868 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
889 GNUNET_MQ_send (eo->tc->mq, mqm); 869 GNUNET_MQ_send (eo->mq, ev);
890 break; 870 break;
891 } 871 }
892 if (1 == side) 872 if (1 == side)
@@ -895,15 +875,15 @@ decode_and_send (struct UnionEvaluateOperation *eo)
895 } 875 }
896 else 876 else
897 { 877 {
898 struct GNUNET_MQ_Envelope *mqm; 878 struct GNUNET_MQ_Envelope *ev;
899 struct GNUNET_MessageHeader *msg; 879 struct GNUNET_MessageHeader *msg;
900 880
901 /* FIXME: before sending the request, check if we may just have the element */ 881 /* FIXME: before sending the request, check if we may just have the element */
902 /* FIXME: merge multiple requests */ 882 /* FIXME: merge multiple requests */
903 mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), 883 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
904 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); 884 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
905 *(struct IBF_Key *) &msg[1] = key; 885 *(struct IBF_Key *) &msg[1] = key;
906 GNUNET_MQ_send (eo->tc->mq, mqm); 886 GNUNET_MQ_send (eo->mq, ev);
907 } 887 }
908 } 888 }
909 ibf_destroy (diff_ibf); 889 ibf_destroy (diff_ibf);
@@ -980,21 +960,21 @@ static void
980send_client_element (struct UnionEvaluateOperation *eo, 960send_client_element (struct UnionEvaluateOperation *eo,
981 struct GNUNET_SET_Element *element) 961 struct GNUNET_SET_Element *element)
982{ 962{
983 struct GNUNET_MQ_Envelope *mqm; 963 struct GNUNET_MQ_Envelope *ev;
984 struct GNUNET_SET_ResultMessage *rm; 964 struct GNUNET_SET_ResultMessage *rm;
985 965
986 GNUNET_assert (0 != eo->request_id); 966 GNUNET_assert (0 != eo->spec->client_request_id);
987 mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 967 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
988 if (NULL == mqm) 968 if (NULL == ev)
989 { 969 {
990 GNUNET_MQ_discard (mqm); 970 GNUNET_MQ_discard (ev);
991 GNUNET_break (0); 971 GNUNET_break (0);
992 return; 972 return;
993 } 973 }
994 rm->result_status = htons (GNUNET_SET_STATUS_OK); 974 rm->result_status = htons (GNUNET_SET_STATUS_OK);
995 rm->request_id = htonl (eo->request_id); 975 rm->request_id = htonl (eo->spec->client_request_id);
996 memcpy (&rm[1], element->data, element->size); 976 memcpy (&rm[1], element->data, element->size);
997 GNUNET_MQ_send (eo->set->client_mq, mqm); 977 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
998} 978}
999 979
1000 980
@@ -1009,14 +989,13 @@ send_client_element (struct UnionEvaluateOperation *eo,
1009static void 989static void
1010send_client_done_and_destroy (struct UnionEvaluateOperation *eo) 990send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1011{ 991{
1012 struct GNUNET_MQ_Envelope *mqm; 992 struct GNUNET_MQ_Envelope *ev;
1013 struct GNUNET_SET_ResultMessage *rm; 993 struct GNUNET_SET_ResultMessage *rm;
1014 994
1015 GNUNET_assert (0 != eo->request_id); 995 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1016 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 996 rm->request_id = htonl (eo->spec->client_request_id);
1017 rm->request_id = htonl (eo->request_id);
1018 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 997 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1019 GNUNET_MQ_send (eo->set->client_mq, mqm); 998 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1020 999
1021} 1000}
1022 1001
@@ -1123,13 +1102,13 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1123 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1102 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1124 { 1103 {
1125 /* we got all requests, but still have to send our elements as response */ 1104 /* we got all requests, but still have to send our elements as response */
1126 struct GNUNET_MQ_Envelope *mqm; 1105 struct GNUNET_MQ_Envelope *ev;
1127 1106
1128 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); 1107 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1129 eo->phase = PHASE_FINISHED; 1108 eo->phase = PHASE_FINISHED;
1130 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1109 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1131 GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); 1110 GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
1132 GNUNET_MQ_send (eo->tc->mq, mqm); 1111 GNUNET_MQ_send (eo->mq, ev);
1133 return; 1112 return;
1134 } 1113 }
1135 if (eo->phase == PHASE_EXPECT_ELEMENTS) 1114 if (eo->phase == PHASE_EXPECT_ELEMENTS)
@@ -1148,80 +1127,69 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1148 * Evaluate a union operation with 1127 * Evaluate a union operation with
1149 * a remote peer. 1128 * a remote peer.
1150 * 1129 *
1151 * @param m the evaluate request message from the client 1130 * @param spec specification of the operation the evaluate
1131 * @param tunnel tunnel already connected to the partner peer
1152 * @param set the set to evaluate the operation with 1132 * @param set the set to evaluate the operation with
1133 * @return a handle to the operation
1153 */ 1134 */
1154void 1135struct UnionEvaluateOperation *
1155_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) 1136_GSS_union_evaluate (struct OperationSpecification *spec,
1137 struct GNUNET_MESH_Tunnel *tunnel)
1156{ 1138{
1157 struct UnionEvaluateOperation *eo; 1139 struct UnionEvaluateOperation *eo;
1158 struct GNUNET_MessageHeader *context_msg; 1140 struct UnionState *st = spec->set->state.u;
1159 1141
1160 eo = GNUNET_new (struct UnionEvaluateOperation); 1142 eo = GNUNET_new (struct UnionEvaluateOperation);
1161 eo->peer = m->target_peer; 1143 eo->se = strata_estimator_dup (spec->set->state.u->se);
1162 eo->set = set; 1144 eo->spec = spec;
1163 eo->request_id = htonl (m->request_id); 1145 eo->tunnel = tunnel;
1164 GNUNET_assert (0 != eo->request_id);
1165 eo->se = strata_estimator_dup (set->state.u->se);
1166 eo->salt = ntohs (m->salt);
1167 eo->app_id = m->app_id;
1168
1169 context_msg = GNUNET_MQ_extract_nested_mh (m);
1170 if (NULL != context_msg)
1171 {
1172 eo->context_msg = GNUNET_copy_message (context_msg);
1173 }
1174 1146
1175 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1147 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1176 "evaluating union operation, (app %s)\n", 1148 "evaluating union operation, (app %s)\n",
1177 GNUNET_h2s (&eo->app_id)); 1149 GNUNET_h2s (&eo->spec->app_id));
1178 1150
1179 eo->tc = GNUNET_new (struct TunnelContext);
1180 eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
1181 GNUNET_APPLICATION_TYPE_SET);
1182 GNUNET_assert (NULL != eo->tc->tunnel);
1183 eo->tc->peer = eo->peer;
1184 eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
1185 /* we started the operation, thus we have to send the operation request */ 1151 /* we started the operation, thus we have to send the operation request */
1186 eo->phase = PHASE_EXPECT_SE; 1152 eo->phase = PHASE_EXPECT_SE;
1187 1153
1188 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, 1154 GNUNET_CONTAINER_DLL_insert (st->ops_head,
1189 eo->set->state.u->ops_tail, 1155 st->ops_tail,
1190 eo); 1156 eo);
1191 1157
1192 send_operation_request (eo); 1158 send_operation_request (eo);
1159
1160 return eo;
1193} 1161}
1194 1162
1195 1163
1196/** 1164/**
1197 * Accept an union operation request from a remote peer 1165 * Accept an union operation request from a remote peer
1198 * 1166 *
1199 * @param m the accept message from the client 1167 * @param spec all necessary information about the operation
1200 * @param set the set of the client 1168 * @param tunnel open tunnel to the partner's peer
1201 * @param incoming information about the requesting remote peer 1169 * @return operation
1202 */ 1170 */
1203void 1171struct UnionEvaluateOperation *
1204_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, 1172_GSS_union_accept (struct OperationSpecification *spec,
1205 struct Incoming *incoming) 1173 struct GNUNET_MESH_Tunnel *tunnel)
1206{ 1174{
1207 struct UnionEvaluateOperation *eo; 1175 struct UnionEvaluateOperation *eo;
1176 struct UnionState *st = spec->set->state.u;
1208 1177
1209 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); 1178 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1210 1179
1211 eo = GNUNET_new (struct UnionEvaluateOperation); 1180 eo = GNUNET_new (struct UnionEvaluateOperation);
1212 eo->tc = incoming->tc; 1181 eo->generation_created = st->current_generation++;
1213 eo->generation_created = set->state.u->current_generation++; 1182 eo->spec = spec;
1214 eo->set = set; 1183 eo->tunnel = tunnel;
1215 eo->salt = ntohs (incoming->salt); 1184 eo->se = strata_estimator_dup (st->se);
1216 GNUNET_assert (0 != ntohl (m->request_id));
1217 eo->request_id = ntohl (m->request_id);
1218 eo->se = strata_estimator_dup (set->state.u->se);
1219 /* transfer ownership of mq and socket from incoming to eo */ 1185 /* transfer ownership of mq and socket from incoming to eo */
1220 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, 1186 GNUNET_CONTAINER_DLL_insert (st->ops_head,
1221 eo->set->state.u->ops_tail, 1187 st->ops_tail,
1222 eo); 1188 eo);
1223 /* kick off the operation */ 1189 /* kick off the operation */
1224 send_strata_estimator (eo); 1190 send_strata_estimator (eo);
1191
1192 return eo;
1225} 1193}
1226 1194
1227 1195
@@ -1370,11 +1338,10 @@ _GSS_union_handle_p2p_message (void *cls,
1370 1338
1371 if (CONTEXT_OPERATION_UNION != tc->type) 1339 if (CONTEXT_OPERATION_UNION != tc->type)
1372 { 1340 {
1373 GNUNET_break_op (0);
1374 return GNUNET_SYSERR; 1341 return GNUNET_SYSERR;
1375 } 1342 }
1376 1343
1377 eo = tc->data; 1344 eo = tc->data.union_op;
1378 1345
1379 switch (ntohs (mh->type)) 1346 switch (ntohs (mh->type))
1380 { 1347 {
diff --git a/src/set/set_protocol.h b/src/set/set_protocol.h
index 543e2a002..945542151 100644
--- a/src/set/set_protocol.h
+++ b/src/set/set_protocol.h
@@ -42,30 +42,21 @@ struct OperationRequestMessage
42 /** 42 /**
43 * Operation to request, values from 'enum GNUNET_SET_OperationType' 43 * Operation to request, values from 'enum GNUNET_SET_OperationType'
44 */ 44 */
45 uint32_t operation; 45 uint32_t operation GNUNET_PACKED;
46 46
47 /** 47 /**
48 * Application-specific identifier of the request. 48 * Salt to use for this operation.
49 */ 49 */
50 struct GNUNET_HashCode app_id; 50 uint32_t salt;
51
52 /* rest: optional message */
53};
54 51
55struct ElementRequestMessage
56{
57 /** 52 /**
58 * Type: GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 53 * Application-specific identifier of the request.
59 */ 54 */
60 struct GNUNET_MessageHeader header; 55 struct GNUNET_HashCode app_id;
61 56
62 /** 57 /* rest: optional message */
63 * Salt the keys in the body use
64 */
65 uint8_t salt;
66}; 58};
67 59
68
69struct IBFMessage 60struct IBFMessage
70{ 61{
71 /** 62 /**
@@ -80,15 +71,20 @@ struct IBFMessage
80 uint8_t order; 71 uint8_t order;
81 72
82 /** 73 /**
83 * Salt used when hashing elements for this IBF. 74 * Padding, must be 0.
84 */ 75 */
85 uint8_t salt; 76 uint8_t reserved;
86 77
87 /** 78 /**
88 * Offset of the strata in the rest of the message 79 * Offset of the strata in the rest of the message
89 */ 80 */
90 uint16_t offset GNUNET_PACKED; 81 uint16_t offset GNUNET_PACKED;
91 82
83 /**
84 * Salt used when hashing elements for this IBF.
85 */
86 uint32_t salt;
87
92 /* rest: strata */ 88 /* rest: strata */
93}; 89};
94 90