aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-07-10 01:31:13 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-07-10 01:31:13 +0000
commit6b8400966a5e6c2194785b3a33f91b748cfa7b7b (patch)
tree0dafa7ba24c7a6dbb852fdedfd1822cd1e4835c0 /src
parent084cb3e09007ef50a3d9bd29514c8ec455249633 (diff)
downloadgnunet-6b8400966a5e6c2194785b3a33f91b748cfa7b7b.tar.gz
gnunet-6b8400966a5e6c2194785b3a33f91b748cfa7b7b.zip
- set service working
- set profiler
Diffstat (limited to 'src')
-rw-r--r--src/consensus/Makefile.am14
-rw-r--r--src/consensus/consensus_api.c2
-rw-r--r--src/consensus/gnunet-consensus-start-peers.c186
-rw-r--r--src/consensus/gnunet-service-consensus.c2
-rw-r--r--src/include/gnunet_mq_lib.h2
-rw-r--r--src/include/gnunet_set_service.h20
-rw-r--r--src/set/gnunet-service-set.c407
-rw-r--r--src/set/gnunet-service-set.h342
-rw-r--r--src/set/gnunet-service-set_union.c431
-rw-r--r--src/set/gnunet-set-profiler.c229
-rw-r--r--src/set/set.h18
-rw-r--r--src/set/set_api.c144
-rw-r--r--src/util/mq.c19
13 files changed, 893 insertions, 923 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index 914fbdef8..4b81baa59 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -16,8 +16,7 @@ if USE_COVERAGE
16endif 16endif
17 17
18bin_PROGRAMS = \ 18bin_PROGRAMS = \
19 gnunet-consensus \ 19 gnunet-consensus
20 gnunet-consensus-start-peers
21 20
22libexec_PROGRAMS = \ 21libexec_PROGRAMS = \
23 gnunet-service-consensus 22 gnunet-service-consensus
@@ -41,17 +40,6 @@ gnunet_consensus_LDADD = \
41gnunet_consensus_DEPENDENCIES = \ 40gnunet_consensus_DEPENDENCIES = \
42 libgnunetconsensus.la 41 libgnunetconsensus.la
43 42
44gnunet_consensus_start_peers_SOURCES = \
45 gnunet-consensus-start-peers.c
46gnunet_consensus_start_peers_LDADD = \
47 $(top_builddir)/src/util/libgnunetutil.la \
48 $(top_builddir)/src/testbed/libgnunettestbed.la \
49 $(top_builddir)/src/consensus/libgnunetconsensus.la \
50 $(GN_LIBINTL)
51gnunet_consensus_start_peers_DEPENDENCIES = \
52 libgnunetconsensus.la
53
54
55gnunet_service_consensus_SOURCES = \ 43gnunet_service_consensus_SOURCES = \
56 gnunet-service-consensus.c 44 gnunet-service-consensus.c
57gnunet_service_consensus_LDADD = \ 45gnunet_service_consensus_LDADD = \
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index a1dc40826..7690dc059 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -205,7 +205,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
205 205
206 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); 206 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
207 consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, 207 consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
208 mq_handlers, consensus); 208 mq_handlers, NULL, consensus);
209 209
210 GNUNET_assert (consensus->client != NULL); 210 GNUNET_assert (consensus->client != NULL);
211 211
diff --git a/src/consensus/gnunet-consensus-start-peers.c b/src/consensus/gnunet-consensus-start-peers.c
deleted file mode 100644
index 05ba05691..000000000
--- a/src/consensus/gnunet-consensus-start-peers.c
+++ /dev/null
@@ -1,186 +0,0 @@
1
2/*
3 This file is part of GNUnet
4 (C) 2012 Christian Grothoff (and other contributing authors)
5
6 GNUnet is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 2, or (at your
9 option) any later version.
10
11 GNUnet is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with GNUnet; see the file COPYING. If not, write to the
18 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 Boston, MA 02111-1307, USA.
20 */
21
22/**
23 * @file consensus/gnunet-consensus-start-peers.c
24 * @brief Starts peers with testebed on localhost,
25 * prints their configuration files and waits for ^C.
26 * @author Florian Dold
27 */
28#include "platform.h"
29#include "gnunet_util_lib.h"
30#include "gnunet_testbed_service.h"
31
32
33static char *config_template_file;
34static unsigned int num_peers_requested = 2;
35static struct GNUNET_TESTBED_Peer **peers;
36
37
38/**
39 * Callback to be called when the requested peer information is available
40 *
41 * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
42 * @param op the operation this callback corresponds to
43 * @param pinfo the result; will be NULL if the operation has failed
44 * @param emsg error message if the operation has failed; will be NULL if the
45 * operation is successfull
46 */
47static void
48peer_info_cb (void *cb_cls,
49 struct GNUNET_TESTBED_Operation
50 *op,
51 const struct
52 GNUNET_TESTBED_PeerInformation
53 *pinfo,
54 const char *emsg)
55{
56 GNUNET_assert (NULL == emsg);
57 if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY)
58 {
59 struct GNUNET_CRYPTO_HashAsciiEncoded enc;
60 GNUNET_CRYPTO_hash_to_enc (&pinfo->result.id->hashPubKey, &enc);
61 printf("peer %td identity:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) - &peers[0]);
62 printf("%s\n", (char *)&enc);
63 }
64 else if (pinfo->pit == GNUNET_TESTBED_PIT_CONFIGURATION)
65 {
66 char *tmpfilename;
67 if (NULL == (tmpfilename = GNUNET_DISK_mktemp ("gnunet-consensus")))
68 {
69 GNUNET_break (0);
70 GNUNET_SCHEDULER_shutdown ();
71 return;
72 }
73 if (GNUNET_SYSERR ==
74 GNUNET_CONFIGURATION_write (pinfo->result.cfg,
75 tmpfilename))
76 {
77 GNUNET_break (0);
78 return;
79 }
80 printf("peer %td config file:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) - &peers[0]);
81 printf("%s\n", tmpfilename);
82 }
83 else
84 {
85 GNUNET_assert (0);
86 }
87}
88
89
90
91/**
92 * Signature of the event handler function called by the
93 * respective event controller.
94 *
95 * @param cls closure
96 * @param event information about the event
97 */
98static void
99controller_cb(void *cls,
100 const struct GNUNET_TESTBED_EventInformation *event)
101{
102 GNUNET_assert (0);
103}
104
105
106
107
108/**
109 * Signature of a main function for a testcase.
110 *
111 * @param cls closure
112 * @param num_peers number of peers in 'peers'
113 * @param started_peers handle to peers being run in the testbed. NULL upon
114 * timeout (see GNUNET_TESTBED_test_run()).
115 * @param links_succeeded the number of overlay link connection attempts that
116 * succeeded
117 * @param links_failed the number of overlay link connection attempts that
118 * failed
119 */
120static void
121test_master (void *cls,
122 unsigned int num_peers,
123 struct GNUNET_TESTBED_Peer **started_peers,
124 unsigned int links_succeeded,
125 unsigned int links_failed)
126{
127 int i;
128
129 printf("started %d peers\n", num_peers);
130 peers = started_peers;
131
132 for (i = 0; i < num_peers; i++)
133 {
134 GNUNET_TESTBED_peer_get_information (peers[i],
135 GNUNET_TESTBED_PIT_IDENTITY,
136 peer_info_cb,
137 &peers[i]);
138 GNUNET_TESTBED_peer_get_information (peers[i],
139 GNUNET_TESTBED_PIT_CONFIGURATION,
140 peer_info_cb,
141 &peers[i]);
142 }
143}
144
145
146static void
147run (void *cls, char *const *args, const char *cfgfile,
148 const struct GNUNET_CONFIGURATION_Handle *config)
149{
150 if (NULL == config_template_file)
151 {
152 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no template file specified\n");
153 return;
154 }
155
156 (void) GNUNET_TESTBED_test_run ("gnunet-consensus-start-peers",
157 config_template_file,
158 num_peers_requested,
159 0,
160 controller_cb,
161 NULL,
162 test_master,
163 NULL);
164}
165
166
167int
168main (int argc, char **argv)
169{
170 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
171 { 't', "config-template", "TEMPLATE",
172 gettext_noop ("start peers with the given template configuration"),
173 GNUNET_YES, &GNUNET_GETOPT_set_string, &config_template_file },
174 { 'n', "num-peers", "NUM",
175 gettext_noop ("number of peers to start"),
176 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers_requested },
177 GNUNET_GETOPT_OPTION_END
178 };
179
180 /* run without scheduler, as test_run already does this */
181 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-start-peers",
182 "help",
183 options, &run, NULL, GNUNET_YES);
184 return 0;
185}
186
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 9cde0e46b..1245dcca4 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -266,8 +266,6 @@ have_exp_subround_finished (const struct ConsensusSession *session)
266} 266}
267 267
268 268
269
270
271/** 269/**
272 * Destroy a session, free all resources associated with it. 270 * Destroy a session, free all resources associated with it.
273 * 271 *
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 048ad39a0..a20588b4d 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -328,12 +328,14 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id);
328 * 328 *
329 * @param connection the client connection 329 * @param connection the client connection
330 * @param handlers handlers for receiving messages 330 * @param handlers handlers for receiving messages
331 * @param error_handler error handler
331 * @param cls closure for the handlers 332 * @param cls closure for the handlers
332 * @return the message queue 333 * @return the message queue
333 */ 334 */
334struct GNUNET_MQ_Handle * 335struct GNUNET_MQ_Handle *
335GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, 336GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
336 const struct GNUNET_MQ_MessageHandler *handlers, 337 const struct GNUNET_MQ_MessageHandler *handlers,
338 GNUNET_MQ_ErrorHandler error_handler,
337 void *cls); 339 void *cls);
338 340
339 341
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h
index ffc7f1c5b..b434e4474 100644
--- a/src/include/gnunet_set_service.h
+++ b/src/include/gnunet_set_service.h
@@ -74,9 +74,16 @@ struct GNUNET_SET_OperationHandle;
74enum GNUNET_SET_OperationType 74enum GNUNET_SET_OperationType
75{ 75{
76 /** 76 /**
77 * A purely local set that does not support any
78 * operation.
79 */
80 GNUNET_SET_OPERATION_NONE,
81
82 /**
77 * Set intersection, only return elements that are in both sets. 83 * Set intersection, only return elements that are in both sets.
78 */ 84 */
79 GNUNET_SET_OPERATION_INTERSECTION, 85 GNUNET_SET_OPERATION_INTERSECTION,
86
80 /** 87 /**
81 * Set union, return all elements that are in at least one of the sets. 88 * Set union, return all elements that are in at least one of the sets.
82 */ 89 */
@@ -116,6 +123,7 @@ enum GNUNET_SET_Status
116 GNUNET_SET_STATUS_DONE 123 GNUNET_SET_STATUS_DONE
117}; 124};
118 125
126
119/** 127/**
120 * The way results are given to the client. 128 * The way results are given to the client.
121 */ 129 */
@@ -137,6 +145,7 @@ enum GNUNET_SET_ResultMode
137 GNUNET_SET_RESULT_REMOVED 145 GNUNET_SET_RESULT_REMOVED
138}; 146};
139 147
148
140/** 149/**
141 * Element stored in a set. 150 * Element stored in a set.
142 */ 151 */
@@ -182,18 +191,19 @@ typedef void (*GNUNET_SET_ResultIterator) (void *cls,
182 191
183/** 192/**
184 * Called when another peer wants to do a set operation with the 193 * Called when another peer wants to do a set operation with the
185 * local peer. 194 * local peer. If a listen error occurs, the 'request' is NULL.
186 * 195 *
187 * @param cls closure 196 * @param cls closure
188 * @param other_peer the other peer 197 * @param other_peer the other peer
189 * @param context_msg message with application specific information from 198 * @param context_msg message with application specific information from
190 * the other peer 199 * the other peer
191 * @param request request from the other peer, use GNUNET_SET_accept 200 * @param request request from the other peer, use GNUNET_SET_accept
201 * Will be NULL if the listener failed.
192 * to accept it, otherwise the request will be refused 202 * to accept it, otherwise the request will be refused
193 * Note that we don't use a return value here, as it is also 203 * Note that we can't just return value from the listen callback,
194 * necessary to specify the set we want to do the operation with, 204 * as it is also necessary to specify the set we want to do the
195 * whith sometimes can be derived from the context message. 205 * operation with, whith sometimes can be derived from the context
196 * Also necessary to specify the timeout. 206 * message. It's necessary to specify the timeout.
197 */ 207 */
198typedef void 208typedef void
199(*GNUNET_SET_ListenCallback) (void *cls, 209(*GNUNET_SET_ListenCallback) (void *cls,
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 62d1ce21b..e5c42eca0 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -46,11 +46,14 @@ struct Incoming
46 46
47 /** 47 /**
48 * Detail information about the operation. 48 * Detail information about the operation.
49 * NULL as long as we did not receive the operation
50 * request from the remote peer.
49 */ 51 */
50 struct OperationSpecification *spec; 52 struct OperationSpecification *spec;
51 53
52 /** 54 /**
53 * The identity of the requesting peer. 55 * The identity of the requesting peer. Needs to
56 * be stored here as the op spec might not have been created yet.
54 */ 57 */
55 struct GNUNET_PeerIdentity peer; 58 struct GNUNET_PeerIdentity peer;
56 59
@@ -83,17 +86,55 @@ struct Incoming
83 86
84 87
85/** 88/**
89 * A listener is inhabited by a client, and
90 * waits for evaluation requests from remote peers.
91 */
92struct Listener
93{
94 /**
95 * Listeners are held in a doubly linked list.
96 */
97 struct Listener *next;
98
99 /**
100 * Listeners are held in a doubly linked list.
101 */
102 struct Listener *prev;
103
104 /**
105 * Client that owns the listener.
106 * Only one client may own a listener.
107 */
108 struct GNUNET_SERVER_Client *client;
109
110 /**
111 * Message queue for the client
112 */
113 struct GNUNET_MQ_Handle *client_mq;
114
115 /**
116 * The type of the operation.
117 */
118 enum GNUNET_SET_OperationType operation;
119
120 /**
121 * Application ID for the operation, used to distinguish
122 * multiple operations of the same type with the same peer.
123 */
124 struct GNUNET_HashCode app_id;
125};
126
127
128/**
86 * Configuration of our local peer. 129 * Configuration of our local peer.
87 * (Not declared 'static' as also needed in gnunet-service-set_union.c)
88 */ 130 */
89const struct GNUNET_CONFIGURATION_Handle *configuration; 131static const struct GNUNET_CONFIGURATION_Handle *configuration;
90 132
91/** 133/**
92 * Handle to the mesh service, used 134 * Handle to the mesh service, used
93 * to listen for and connect to remote peers. 135 * to listen for and connect to remote peers.
94 * (Not declared 'static' as also needed in gnunet-service-set_union.c)
95 */ 136 */
96struct GNUNET_MESH_Handle *mesh; 137static struct GNUNET_MESH_Handle *mesh;
97 138
98/** 139/**
99 * Sets are held in a doubly linked list. 140 * Sets are held in a doubly linked list.
@@ -204,8 +245,9 @@ listener_destroy (struct Listener *listener)
204 * The client's destroy callback will destroy the listener again. */ 245 * The client's destroy callback will destroy the listener again. */
205 if (NULL != listener->client) 246 if (NULL != listener->client)
206 { 247 {
207 GNUNET_SERVER_client_disconnect (listener->client); 248 struct GNUNET_SERVER_Client *client = listener->client;
208 listener->client = NULL; 249 listener->client = NULL;
250 GNUNET_SERVER_client_disconnect (client);
209 return; 251 return;
210 } 252 }
211 if (NULL != listener->client_mq) 253 if (NULL != listener->client_mq)
@@ -230,22 +272,19 @@ set_destroy (struct Set *set)
230 * The client's destroy callback will destroy the set again. */ 272 * The client's destroy callback will destroy the set again. */
231 if (NULL != set->client) 273 if (NULL != set->client)
232 { 274 {
233 GNUNET_SERVER_client_disconnect (set->client); 275 struct GNUNET_SERVER_Client *client = set->client;
234 set->client = NULL; 276 set->client = NULL;
277 GNUNET_SERVER_client_disconnect (client);
235 return; 278 return;
236 } 279 }
237 switch (set->operation) 280 if (NULL != set->client_mq)
238 { 281 {
239 case GNUNET_SET_OPERATION_INTERSECTION: 282 GNUNET_MQ_destroy (set->client_mq);
240 GNUNET_assert (0); 283 set->client_mq = NULL;
241 break;
242 case GNUNET_SET_OPERATION_UNION:
243 _GSS_union_set_destroy (set);
244 break;
245 default:
246 GNUNET_assert (0);
247 break;
248 } 284 }
285 GNUNET_assert (NULL != set->state);
286 set->vt->destroy_set (set->state);
287 set->state = NULL;
249 GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set); 288 GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set);
250 GNUNET_free (set); 289 GNUNET_free (set);
251} 290}
@@ -264,6 +303,8 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
264 struct Set *set; 303 struct Set *set;
265 struct Listener *listener; 304 struct Listener *listener;
266 305
306 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, cleaning up\n");
307
267 set = set_get (client); 308 set = set_get (client);
268 if (NULL != set) 309 if (NULL != set)
269 { 310 {
@@ -287,6 +328,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
287static void 328static void
288incoming_destroy (struct Incoming *incoming) 329incoming_destroy (struct Incoming *incoming)
289{ 330{
331 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
290 if (NULL != incoming->tunnel) 332 if (NULL != incoming->tunnel)
291 { 333 {
292 struct GNUNET_MESH_Tunnel *t = incoming->tunnel; 334 struct GNUNET_MESH_Tunnel *t = incoming->tunnel;
@@ -294,7 +336,6 @@ incoming_destroy (struct Incoming *incoming)
294 GNUNET_MESH_tunnel_destroy (t); 336 GNUNET_MESH_tunnel_destroy (t);
295 return; 337 return;
296 } 338 }
297 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
298 GNUNET_free (incoming); 339 GNUNET_free (incoming);
299} 340}
300 341
@@ -338,7 +379,7 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener)
338 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, 379 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST,
339 incoming->spec->context_msg); 380 incoming->spec->context_msg);
340 GNUNET_assert (NULL != mqm); 381 GNUNET_assert (NULL != mqm);
341 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id %u\n", incoming->suggest_id); 382 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id %u\n", incoming->suggest_id);
342 cmsg->accept_id = htonl (incoming->suggest_id); 383 cmsg->accept_id = htonl (incoming->suggest_id);
343 cmsg->peer_id = incoming->spec->peer; 384 cmsg->peer_id = incoming->spec->peer;
344 GNUNET_MQ_send (listener->client_mq, mqm); 385 GNUNET_MQ_send (listener->client_mq, mqm);
@@ -350,33 +391,28 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener)
350 * Handle a request for a set operation from 391 * Handle a request for a set operation from
351 * another peer. 392 * another peer.
352 * 393 *
353 * @param cls the incoming socket 394 * @param op the operation state
354 * @param tunnel the tunnel that sent the message 395 * @param mh the received message
355 * @param tunnel_ctx the tunnel context 396 * @return GNUNET_OK if the tunnel should be kept alive,
356 * @param mh the message 397 * GNUNET_SYSERR to destroy the tunnel
357 */ 398 */
358static int 399static int
359handle_p2p_operation_request (void *cls, 400handle_incoming_msg (struct OperationState *op,
360 struct GNUNET_MESH_Tunnel *tunnel, 401 const struct GNUNET_MessageHeader *mh)
361 void **tunnel_ctx,
362 const struct GNUNET_MessageHeader *mh)
363{ 402{
364 struct TunnelContext *tc = *tunnel_ctx; 403 struct Incoming *incoming = (struct Incoming *) op;
365 struct Incoming *incoming;
366 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; 404 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
367 struct Listener *listener; 405 struct Listener *listener;
368 struct OperationSpecification *spec; 406 struct OperationSpecification *spec;
369 407
370 if (CONTEXT_INCOMING != tc->type) 408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got op request\n");
409
410 if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
371 { 411 {
372 /* unexpected request */
373 GNUNET_break_op (0); 412 GNUNET_break_op (0);
374 /* kill the tunnel, cleaner will be called */
375 return GNUNET_SYSERR; 413 return GNUNET_SYSERR;
376 } 414 }
377 415
378 incoming = tc->data.incoming;
379
380 if (NULL != incoming->spec) 416 if (NULL != incoming->spec)
381 { 417 {
382 /* double operation request */ 418 /* double operation request */
@@ -385,8 +421,9 @@ handle_p2p_operation_request (void *cls,
385 } 421 }
386 422
387 spec = GNUNET_new (struct OperationSpecification); 423 spec = GNUNET_new (struct OperationSpecification);
388 spec->context_msg = 424 spec->context_msg = GNUNET_MQ_extract_nested_mh (msg);
389 GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg)); 425 if (NULL != spec->context_msg)
426 spec->context_msg = GNUNET_copy_message (spec->context_msg);
390 spec->operation = ntohl (msg->operation); 427 spec->operation = ntohl (msg->operation);
391 spec->app_id = msg->app_id; 428 spec->app_id = msg->app_id;
392 spec->salt = ntohl (msg->salt); 429 spec->salt = ntohl (msg->salt);
@@ -401,12 +438,12 @@ handle_p2p_operation_request (void *cls,
401 return GNUNET_SYSERR; 438 return GNUNET_SYSERR;
402 } 439 }
403 440
404 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", 441 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, app %s)\n",
405 ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); 442 ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
406 listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id); 443 listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id);
407 if (NULL == listener) 444 if (NULL == listener)
408 { 445 {
409 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 446 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
410 "no listener matches incoming request, waiting with timeout\n"); 447 "no listener matches incoming request, waiting with timeout\n");
411 return GNUNET_OK; 448 return GNUNET_OK;
412 } 449 }
@@ -430,7 +467,7 @@ handle_client_create (void *cls,
430 struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m; 467 struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m;
431 struct Set *set; 468 struct Set *set;
432 469
433 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", 470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client created new set (operation %u)\n",
434 ntohs (msg->operation)); 471 ntohs (msg->operation));
435 472
436 if (NULL != set_get (client)) 473 if (NULL != set_get (client))
@@ -441,14 +478,15 @@ handle_client_create (void *cls,
441 } 478 }
442 479
443 set = NULL; 480 set = NULL;
481 set = GNUNET_new (struct Set);
444 482
445 switch (ntohs (msg->operation)) 483 switch (ntohs (msg->operation))
446 { 484 {
447 case GNUNET_SET_OPERATION_INTERSECTION: 485 case GNUNET_SET_OPERATION_INTERSECTION:
448 //set = _GSS_intersection_set_create (); 486 // FIXME
449 break; 487 break;
450 case GNUNET_SET_OPERATION_UNION: 488 case GNUNET_SET_OPERATION_UNION:
451 set = _GSS_union_set_create (); 489 set->vt = _GSS_union_vt ();
452 break; 490 break;
453 default: 491 default:
454 GNUNET_break (0); 492 GNUNET_break (0);
@@ -456,8 +494,7 @@ handle_client_create (void *cls,
456 return; 494 return;
457 } 495 }
458 496
459 GNUNET_assert (NULL != set); 497 set->state = set->vt->create ();
460
461 set->client = client; 498 set->client = client;
462 set->client_mq = GNUNET_MQ_queue_for_server_client (client); 499 set->client_mq = GNUNET_MQ_queue_for_server_client (client);
463 GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); 500 GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
@@ -493,7 +530,7 @@ handle_client_listen (void *cls,
493 listener->app_id = msg->app_id; 530 listener->app_id = msg->app_id;
494 listener->operation = ntohs (msg->operation); 531 listener->operation = ntohs (msg->operation);
495 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); 532 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
496 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new listener created (op %u, app %s)\n", 533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n",
497 listener->operation, GNUNET_h2s (&listener->app_id)); 534 listener->operation, GNUNET_h2s (&listener->app_id));
498 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) 535 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
499 { 536 {
@@ -511,46 +548,6 @@ handle_client_listen (void *cls,
511 548
512 549
513/** 550/**
514 * Called when a client wants to remove an element
515 * from the set it inhabits.
516 *
517 * @param cls unused
518 * @param client client that sent the message
519 * @param m message sent by the client
520 */
521static void
522handle_client_remove (void *cls,
523 struct GNUNET_SERVER_Client *client,
524 const struct GNUNET_MessageHeader *m)
525{
526 struct Set *set;
527
528 set = set_get (client);
529 if (NULL == set)
530 {
531 GNUNET_break (0);
532 GNUNET_SERVER_client_disconnect (client);
533 return;
534 }
535 switch (set->operation)
536 {
537 case GNUNET_SET_OPERATION_UNION:
538 _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set);
539 break;
540 case GNUNET_SET_OPERATION_INTERSECTION:
541 //_GSS_intersection_remove ((struct GNUNET_SET_ElementMessage *) m, set);
542 break;
543 default:
544 GNUNET_assert (0);
545 break;
546 }
547
548 GNUNET_SERVER_receive_done (client, GNUNET_OK);
549}
550
551
552
553/**
554 * Called when the client wants to reject an operation 551 * Called when the client wants to reject an operation
555 * request from another peer. 552 * request from another peer.
556 * 553 *
@@ -574,13 +571,12 @@ handle_client_reject (void *cls,
574 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 571 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
575 return; 572 return;
576 } 573 }
577 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); 574 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n");
578 GNUNET_MESH_tunnel_destroy (incoming->tunnel); 575 GNUNET_MESH_tunnel_destroy (incoming->tunnel);
579 GNUNET_SERVER_receive_done (client, GNUNET_OK); 576 GNUNET_SERVER_receive_done (client, GNUNET_OK);
580} 577}
581 578
582 579
583
584/** 580/**
585 * Called when a client wants to add an element to a 581 * Called when a client wants to add an element to a
586 * set it inhabits. 582 * set it inhabits.
@@ -590,11 +586,13 @@ handle_client_reject (void *cls,
590 * @param m message sent by the client 586 * @param m message sent by the client
591 */ 587 */
592static void 588static void
593handle_client_add (void *cls, 589handle_client_add_remove (void *cls,
594 struct GNUNET_SERVER_Client *client, 590 struct GNUNET_SERVER_Client *client,
595 const struct GNUNET_MessageHeader *m) 591 const struct GNUNET_MessageHeader *m)
596{ 592{
597 struct Set *set; 593 struct Set *set;
594 const struct GNUNET_SET_ElementMessage *msg;
595 struct GNUNET_SET_Element el;
598 596
599 set = set_get (client); 597 set = set_get (client);
600 if (NULL == set) 598 if (NULL == set)
@@ -603,19 +601,14 @@ handle_client_add (void *cls,
603 GNUNET_SERVER_client_disconnect (client); 601 GNUNET_SERVER_client_disconnect (client);
604 return; 602 return;
605 } 603 }
606 switch (set->operation) 604 msg = (const struct GNUNET_SET_ElementMessage *) m;
607 { 605 el.size = ntohs (m->size) - sizeof *msg;
608 case GNUNET_SET_OPERATION_UNION: 606 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client ins/rem element of size %u\n", el.size);
609 _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set); 607 el.data = &msg[1];
610 break; 608 if (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type))
611 case GNUNET_SET_OPERATION_INTERSECTION: 609 set->vt->remove (set->state, &el);
612 //_GSS_intersection_add ((struct GNUNET_SET_ElementMessage *) m, set); 610 else
613 break; 611 set->vt->add (set->state, &el);
614 default:
615 GNUNET_assert (0);
616 break;
617 }
618
619 GNUNET_SERVER_receive_done (client, GNUNET_OK); 612 GNUNET_SERVER_receive_done (client, GNUNET_OK);
620} 613}
621 614
@@ -653,27 +646,15 @@ handle_client_evaluate (void *cls,
653 spec->app_id = msg->app_id; 646 spec->app_id = msg->app_id;
654 spec->salt = ntohl (msg->salt); 647 spec->salt = ntohl (msg->salt);
655 spec->peer = msg->target_peer; 648 spec->peer = msg->target_peer;
649 spec->set = set;
650 spec->client_request_id = ntohl (msg->request_id);
656 651
657 tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer, 652 tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer,
658 GNUNET_APPLICATION_TYPE_SET, 653 GNUNET_APPLICATION_TYPE_SET,
659 GNUNET_YES, 654 GNUNET_YES,
660 GNUNET_YES); 655 GNUNET_YES);
661 656
662 switch (set->operation) 657 set->vt->evaluate (spec, tunnel, tc);
663 {
664 case GNUNET_SET_OPERATION_INTERSECTION:
665 tc->type = CONTEXT_OPERATION_INTERSECTION;
666 //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set);
667 break;
668 case GNUNET_SET_OPERATION_UNION:
669 tc->type = CONTEXT_OPERATION_UNION;
670 tc->data.union_op =
671 _GSS_union_evaluate (spec, tunnel);
672 break;
673 default:
674 GNUNET_assert (0);
675 break;
676 }
677 658
678 GNUNET_SERVER_receive_done (client, GNUNET_OK); 659 GNUNET_SERVER_receive_done (client, GNUNET_OK);
679} 660}
@@ -705,6 +686,35 @@ handle_client_ack (void *cls,
705 * @param mh the message 686 * @param mh the message
706 */ 687 */
707static void 688static void
689handle_client_cancel (void *cls,
690 struct GNUNET_SERVER_Client *client,
691 const struct GNUNET_MessageHeader *mh)
692{
693 const struct GNUNET_SET_CancelMessage *msg =
694 (const struct GNUNET_SET_CancelMessage *) mh;
695 struct Set *set;
696
697 set = set_get (client);
698 if (NULL == set)
699 {
700 GNUNET_break (0);
701 GNUNET_SERVER_client_disconnect (client);
702 return;
703 }
704 /* FIXME: maybe cancel should return success/error code? */
705 set->vt->cancel (set->state, ntohl (msg->request_id));
706}
707
708
709/**
710 * Handle a request from the client to accept
711 * a set operation that came from a remote peer.
712 *
713 * @param cls unused
714 * @param client the client
715 * @param mh the message
716 */
717static void
708handle_client_accept (void *cls, 718handle_client_accept (void *cls,
709 struct GNUNET_SERVER_Client *client, 719 struct GNUNET_SERVER_Client *client,
710 const struct GNUNET_MessageHeader *mh) 720 const struct GNUNET_MessageHeader *mh)
@@ -712,12 +722,10 @@ handle_client_accept (void *cls,
712 struct Set *set; 722 struct Set *set;
713 struct Incoming *incoming; 723 struct Incoming *incoming;
714 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; 724 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh;
715 struct OperationSpecification *spec;
716 struct TunnelContext *tc;
717 725
718 incoming = get_incoming (ntohl (msg->accept_reject_id)); 726 incoming = get_incoming (ntohl (msg->accept_reject_id));
719 727
720 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl (msg->accept_reject_id)); 728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl (msg->accept_reject_id));
721 729
722 if (NULL == incoming) 730 if (NULL == incoming)
723 { 731 {
@@ -736,24 +744,9 @@ handle_client_accept (void *cls,
736 return; 744 return;
737 } 745 }
738 746
739 spec = GNUNET_new (struct OperationSpecification); 747 incoming->spec->set = set;
740 tc = incoming->tc; 748 incoming->spec->client_request_id = ntohl (msg->request_id);
741 749 set->vt->accept (incoming->spec, incoming->tunnel, incoming->tc);
742 switch (set->operation)
743 {
744 case GNUNET_SET_OPERATION_INTERSECTION:
745 tc->type = CONTEXT_OPERATION_INTERSECTION;
746 // _GSS_intersection_accept (msg, set, incoming);
747 break;
748 case GNUNET_SET_OPERATION_UNION:
749 tc->type = CONTEXT_OPERATION_UNION;
750 tc->data.union_op = _GSS_union_accept (spec, incoming->tunnel);
751 break;
752 default:
753 GNUNET_assert (0);
754 break;
755 }
756
757 /* tunnel ownership goes to operation */ 750 /* tunnel ownership goes to operation */
758 incoming->tunnel = NULL; 751 incoming->tunnel = NULL;
759 incoming_destroy (incoming); 752 incoming_destroy (incoming);
@@ -771,12 +764,6 @@ static void
771shutdown_task (void *cls, 764shutdown_task (void *cls,
772 const struct GNUNET_SCHEDULER_TaskContext *tc) 765 const struct GNUNET_SCHEDULER_TaskContext *tc)
773{ 766{
774 if (NULL != mesh)
775 {
776 GNUNET_MESH_disconnect (mesh);
777 mesh = NULL;
778 }
779
780 while (NULL != incoming_head) 767 while (NULL != incoming_head)
781 incoming_destroy (incoming_head); 768 incoming_destroy (incoming_head);
782 769
@@ -786,9 +773,17 @@ shutdown_task (void *cls,
786 while (NULL != sets_head) 773 while (NULL != sets_head)
787 set_destroy (sets_head); 774 set_destroy (sets_head);
788 775
789 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
790}
791 776
777 /* it's important to destroy mesh at the end, as tunnels
778 * must be destroyed first! */
779 if (NULL != mesh)
780 {
781 GNUNET_MESH_disconnect (mesh);
782 mesh = NULL;
783 }
784
785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
786}
792 787
793 788
794/** 789/**
@@ -803,7 +798,18 @@ incoming_timeout_cb (void *cls,
803{ 798{
804 struct Incoming *incoming = cls; 799 struct Incoming *incoming = cls;
805 800
806 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out\n"); 801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "remote peer timed out\n");
802 incoming_destroy (incoming);
803}
804
805
806static void
807handle_incoming_disconnect (struct OperationState *op_state)
808{
809 struct Incoming *incoming = (struct Incoming *) op_state;
810 if (NULL == incoming->tunnel)
811 return;
812
807 incoming_destroy (incoming); 813 incoming_destroy (incoming);
808} 814}
809 815
@@ -830,23 +836,25 @@ tunnel_new_cb (void *cls,
830 uint32_t port) 836 uint32_t port)
831{ 837{
832 struct Incoming *incoming; 838 struct Incoming *incoming;
833 struct TunnelContext *tc; 839 static const struct SetVT incoming_vt = {
840 .msg_handler = handle_incoming_msg,
841 .peer_disconnect = handle_incoming_disconnect
842 };
834 843
835 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new incoming tunnel\n"); 844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n");
836 845
837 GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); 846 GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
838 tc = GNUNET_new (struct TunnelContext);
839 incoming = GNUNET_new (struct Incoming); 847 incoming = GNUNET_new (struct Incoming);
840 incoming->peer = *initiator; 848 incoming->peer = *initiator;
841 incoming->tunnel = tunnel; 849 incoming->tunnel = tunnel;
842 incoming->tc = tc; 850 incoming->tc = GNUNET_new (struct TunnelContext);;
843 tc->data.incoming = incoming; 851 incoming->tc->vt = &incoming_vt;
844 tc->type = CONTEXT_INCOMING; 852 incoming->tc->op = (struct OperationState *) incoming;
845 incoming->timeout_task = 853 incoming->timeout_task =
846 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); 854 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming);
847 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); 855 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
848 856
849 return tc; 857 return incoming->tc;
850} 858}
851 859
852 860
@@ -867,27 +875,46 @@ tunnel_end_cb (void *cls,
867{ 875{
868 struct TunnelContext *ctx = tunnel_ctx; 876 struct TunnelContext *ctx = tunnel_ctx;
869 877
870 switch (ctx->type) 878 ctx->vt->peer_disconnect (ctx->op);
871 { 879 /* mesh will never call us with the context again! */
872 case CONTEXT_INCOMING:
873 incoming_destroy (ctx->data.incoming);
874 break;
875 case CONTEXT_OPERATION_UNION:
876 _GSS_union_operation_destroy (ctx->data.union_op);
877 break;
878 case CONTEXT_OPERATION_INTERSECTION:
879 GNUNET_assert (0);
880 /* FIXME: cfuchs */
881 break;
882 default:
883 GNUNET_assert (0);
884 }
885
886 GNUNET_free (tunnel_ctx); 880 GNUNET_free (tunnel_ctx);
887} 881}
888 882
889 883
890/** 884/**
885 * Functions with this signature are called whenever a message is
886 * received.
887 *
888 * Each time the function must call GNUNET_MESH_receive_done on the tunnel
889 * in order to receive the next message. This doesn't need to be immediate:
890 * can be delayed if some processing is done on the message.
891 *
892 * @param cls Closure (set from GNUNET_MESH_connect).
893 * @param tunnel Connection to the other end.
894 * @param tunnel_ctx Place to store local state associated with the tunnel.
895 * @param message The actual message.
896 *
897 * @return GNUNET_OK to keep the tunnel open,
898 * GNUNET_SYSERR to close it (signal serious error).
899 */
900static int
901dispatch_p2p_message (void *cls,
902 struct GNUNET_MESH_Tunnel *tunnel,
903 void **tunnel_ctx,
904 const struct GNUNET_MessageHeader *message)
905{
906 struct TunnelContext *tc = *tunnel_ctx;
907 int ret;
908
909 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message\n");
910 ret = tc->vt->msg_handler (tc->op, message);
911 GNUNET_MESH_receive_done (tunnel);
912
913 return ret;
914}
915
916
917/**
891 * Function called by the service's run 918 * Function called by the service's run
892 * method to run service-specific setup code. 919 * method to run service-specific setup code.
893 * 920 *
@@ -900,31 +927,29 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
900 const struct GNUNET_CONFIGURATION_Handle *cfg) 927 const struct GNUNET_CONFIGURATION_Handle *cfg)
901{ 928{
902 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 929 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
903 {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0}, 930 {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT,
931 sizeof (struct GNUNET_SET_AcceptRejectMessage)},
904 {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0}, 932 {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
905 {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, 933 {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
906 {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, 934 {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE,
935 sizeof (struct GNUNET_SET_CreateMessage)},
907 {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, 936 {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
908 {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0}, 937 {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN,
909 {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0}, 938 sizeof (struct GNUNET_SET_ListenMessage)},
910 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, 939 {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT,
940 sizeof (struct GNUNET_SET_AcceptRejectMessage)},
941 {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
942 {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE,
943 sizeof (struct GNUNET_SET_CancelMessage)},
911 {NULL, NULL, 0, 0} 944 {NULL, NULL, 0, 0}
912 }; 945 };
913 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { 946 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
914 {handle_p2p_operation_request, 947 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
915 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, 948 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
916 /* messages for the union operation */ 949 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
917 {_GSS_union_handle_p2p_message, 950 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
918 GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0}, 951 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
919 {_GSS_union_handle_p2p_message, 952 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
920 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
921 {_GSS_union_handle_p2p_message,
922 GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
923 {_GSS_union_handle_p2p_message,
924 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
925 {_GSS_union_handle_p2p_message,
926 GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
927 /* FIXME: messages for intersection operation */
928 {NULL, 0, 0} 953 {NULL, 0, 0}
929 }; 954 };
930 static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; 955 static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
@@ -943,7 +968,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
943 return; 968 return;
944 } 969 }
945 970
946 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n"); 971 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "started\n");
947} 972}
948 973
949 974
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 74cbf584e..6ababe92f 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -38,67 +38,25 @@
38#include "set.h" 38#include "set.h"
39 39
40 40
41/* FIXME: cfuchs */
42struct IntersectionState;
43
44
45/* FIXME: cfuchs */
46struct IntersectionOperation;
47
48
49/**
50 * Extra state required for set union.
51 */
52struct UnionState;
53
54/** 41/**
55 * State of a union operation being evaluated. 42 * Implementation-specific set state.
43 * Used as opaque pointer, and specified further
44 * in the respective implementation.
56 */ 45 */
57struct UnionEvaluateOperation; 46struct SetState;
58
59 47
60 48
61/** 49/**
62 * A set that supports a specific operation 50 * Implementation-specific set operation.
63 * with other peers. 51 * Used as opaque pointer, and specified further
52 * in the respective implementation.
64 */ 53 */
65struct Set 54struct OperationState;
66{
67 /**
68 * Client that owns the set.
69 * Only one client may own a set.
70 */
71 struct GNUNET_SERVER_Client *client;
72
73 /**
74 * Message queue for the client
75 */
76 struct GNUNET_MQ_Handle *client_mq;
77 55
78 /**
79 * Type of operation supported for this set
80 */
81 uint32_t operation; // use enum from API
82 56
83 /** 57/* forward declarations */
84 * Sets are held in a doubly linked list. 58struct Set;
85 */ 59struct TunnelContext;
86 struct Set *next;
87
88 /**
89 * Sets are held in a doubly linked list.
90 */
91 struct Set *prev;
92
93 /**
94 * Appropriate state for each type of
95 * operation.
96 */
97 union {
98 struct IntersectionState *i;
99 struct UnionState *u;
100 } state;
101};
102 60
103 61
104/** 62/**
@@ -146,96 +104,169 @@ struct OperationSpecification
146 104
147 105
148/** 106/**
149 * A listener is inhabited by a client, and 107 * Signature of functions that create the implementation-specific
150 * waits for evaluation requests from remote peers. 108 * state for a set supporting a specific operation.
109 *
110 * @return a set state specific to the supported operation
111 */
112typedef struct SetState *(*CreateImpl) (void);
113
114
115/**
116 * Signature of functions that implement the add/remove functionality
117 * for a set supporting a specific operation.
118 *
119 * @param set implementation-specific set state
120 * @param msg element message from the client
121 */
122typedef void (*AddRemoveImpl) (struct SetState *state, const struct GNUNET_SET_Element *element);
123
124
125/**
126 * Signature of functions that handle disconnection
127 * of the remote peer.
128 *
129 * @param op the set operation, contains implementation-specific data
130 */
131typedef void (*PeerDisconnectImpl) (struct OperationState *op);
132
133
134/**
135 * Signature of functions that implement the destruction of the
136 * implementation-specific set state.
137 *
138 * @param state the set state, contains implementation-specific data
139 */
140typedef void (*DestroySetImpl) (struct SetState *state);
141
142
143/**
144 * Signature of functions that implement the creation of set operations
145 * (currently evaluate and accept).
146 *
147 * @param spec specification of the set operation to be created
148 * @param tunnel the tunnel with the other peer
149 * @param tc tunnel context
150 */
151typedef void (*OpCreateImpl) (struct OperationSpecification *spec,
152 struct GNUNET_MESH_Tunnel *tunnel,
153 struct TunnelContext *tc);
154
155
156/**
157 * Signature of functions that implement the message handling for
158 * the different set operations.
159 *
160 * @param op operation state
161 * @param msg received message
162 * @return GNUNET_OK on success, GNUNET_SYSERR to
163 * destroy the operation and the tunnel
151 */ 164 */
152struct Listener 165typedef int (*MsgHandlerImpl) (struct OperationState *op,
166 const struct GNUNET_MessageHeader *msg);
167
168typedef void (*CancelImpl) (struct SetState *set,
169 uint32_t request_id);
170
171
172/**
173 * Dispatch table for a specific set operation.
174 * Every set operation has to implement the callback
175 * in this struct.
176 */
177struct SetVT
153{ 178{
154 /** 179 /**
155 * Listeners are held in a doubly linked list. 180 * Callback for the set creation.
156 */ 181 */
157 struct Listener *next; 182 CreateImpl create;
158 183
159 /** 184 /**
160 * Listeners are held in a doubly linked list. 185 * Callback for element insertion
161 */ 186 */
162 struct Listener *prev; 187 AddRemoveImpl add;
163 188
164 /** 189 /**
165 * Client that owns the listener. 190 * Callback for element removal.
166 * Only one client may own a listener.
167 */ 191 */
168 struct GNUNET_SERVER_Client *client; 192 AddRemoveImpl remove;
169 193
170 /** 194 /**
171 * Message queue for the client 195 * Callback for accepting a set operation request
172 */ 196 */
173 struct GNUNET_MQ_Handle *client_mq; 197 OpCreateImpl accept;
174 198
175 /** 199 /**
176 * The type of the operation. 200 * Callback for starting evaluation with a remote peer.
177 */ 201 */
178 enum GNUNET_SET_OperationType operation; 202 OpCreateImpl evaluate;
179 203
180 /** 204 /**
181 * Application ID for the operation, used to distinguish 205 * Callback for destruction of the set state.
182 * multiple operations of the same type with the same peer.
183 */ 206 */
184 struct GNUNET_HashCode app_id; 207 DestroySetImpl destroy_set;
185};
186 208
209 /**
210 * Callback for handling operation-specific messages.
211 */
212 MsgHandlerImpl msg_handler;
187 213
188/** 214 /**
189 * Peer that has connected to us, but is not yet evaluating a set operation. 215 * Callback for handling the remote peer's
190 * Once the peer has sent a request, and the client has 216 * disconnect.
191 * accepted or rejected it, this information will be deleted. 217 */
192 */ 218 PeerDisconnectImpl peer_disconnect;
193struct Incoming; 219
220 /**
221 * Callback for canceling an operation by
222 * its ID.
223 */
224 CancelImpl cancel;
225};
194 226
195 227
196/** 228/**
197 * Different types a tunnel can be. 229 * A set that supports a specific operation
230 * with other peers.
198 */ 231 */
199enum TunnelContextType 232struct Set
200{ 233{
201 /** 234 /**
202 * Tunnel is waiting for a set request from the tunnel, 235 * Client that owns the set.
203 * or for the ack/nack of the client for a received request. 236 * Only one client may own a set.
204 */ 237 */
205 CONTEXT_INCOMING, 238 struct GNUNET_SERVER_Client *client;
206 239
207 /** 240 /**
208 * The tunnel performs a union operation. 241 * Message queue for the client
209 */ 242 */
210 CONTEXT_OPERATION_UNION, 243 struct GNUNET_MQ_Handle *client_mq;
211 244
212 /** 245 /**
213 * The tunnel performs an intersection operation. 246 * Type of operation supported for this set
214 */ 247 */
215 CONTEXT_OPERATION_INTERSECTION, 248 enum GNUNET_SET_OperationType operation;
216};
217 249
250 /**
251 * Virtual table for this set.
252 * Determined by the operation type of this set.
253 */
254 const struct SetVT *vt;
218 255
219/**
220 * State associated with the tunnel, dependent on
221 * tunnel type.
222 */
223union TunnelContextData
224{
225 /** 256 /**
226 * Valid for tag 'CONTEXT_INCOMING' 257 * Sets are held in a doubly linked list.
227 */ 258 */
228 struct Incoming *incoming; 259 struct Set *next;
229 260
230 /** 261 /**
231 * Valid for tag 'CONTEXT_OPERATION_UNION' 262 * Sets are held in a doubly linked list.
232 */ 263 */
233 struct UnionEvaluateOperation *union_op; 264 struct Set *prev;
234 265
235 /** 266 /**
236 * Valid for tag 'CONTEXT_OPERATION_INTERSECTION' 267 * Implementation-specific state.
237 */ 268 */
238 struct IntersectionEvaluateOperation *intersection_op; 269 struct SetState *state;
239}; 270};
240 271
241 272
@@ -246,119 +277,24 @@ union TunnelContextData
246struct TunnelContext 277struct TunnelContext
247{ 278{
248 /** 279 /**
249 * Type of the tunnel. 280 * V-Table for the operation belonging
281 * to the tunnel contest.
250 */ 282 */
251 enum TunnelContextType type; 283 const struct SetVT *vt;
252 284
253 /** 285 /**
254 * State associated with the tunnel, dependent on 286 * Implementation-specific operation state.
255 * tunnel type.
256 */ 287 */
257 union TunnelContextData data; 288 struct OperationState *op;
258}; 289};
259 290
260 291
261
262/**
263 * Configuration of the local peer.
264 */
265extern const struct GNUNET_CONFIGURATION_Handle *configuration;
266
267/**
268 * Handle to the mesh service.
269 */
270extern struct GNUNET_MESH_Handle *mesh;
271
272
273/**
274 * Create a new set supporting the union operation
275 *
276 * @return the newly created set
277 */
278struct Set *
279_GSS_union_set_create (void);
280
281
282/**
283 * Evaluate a union operation with
284 * a remote peer.
285 *
286 * @param spec specification of the operation the evaluate
287 * @param tunnel tunnel already connected to the partner peer
288 * @return a handle to the operation
289 */
290struct UnionEvaluateOperation *
291_GSS_union_evaluate (struct OperationSpecification *spec,
292 struct GNUNET_MESH_Tunnel *tunnel);
293
294
295/**
296 * Add the element from the given element message to the set.
297 *
298 * @param m message with the element
299 * @param set set to add the element to
300 */
301void
302_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set);
303
304
305/**
306 * Remove the element given in the element message from the set.
307 * Only marks the element as removed, so that older set operations can still exchange it.
308 *
309 * @param m message with the element
310 * @param set set to remove the element from
311 */
312void
313_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set);
314
315
316/**
317 * Destroy a set that supports the union operation
318 *
319 * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
320 */
321void
322_GSS_union_set_destroy (struct Set *set);
323
324
325/** 292/**
326 * Accept an union operation request from a remote peer 293 * Get the table with implementing functions for
327 * 294 * set union.
328 * @param spec all necessary information about the operation
329 * @param tunnel open tunnel to the partner's peer
330 * @return operation
331 */
332struct UnionEvaluateOperation *
333_GSS_union_accept (struct OperationSpecification *spec,
334 struct GNUNET_MESH_Tunnel *tunnel);
335
336
337/**
338 * Destroy a union operation, and free all resources
339 * associated with it.
340 *
341 * @param eo the union operation to destroy
342 */
343void
344_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo);
345
346
347/**
348 * Dispatch messages for a union operation.
349 *
350 * @param cls closure
351 * @param tunnel mesh tunnel
352 * @param tunnel_ctx tunnel context
353 * @param mh message to process
354 * @return GNUNET_SYSERR if the tunnel should be disconnected,
355 * GNUNET_OK otherwise
356 */ 295 */
357int 296const struct SetVT *
358_GSS_union_handle_p2p_message (void *cls, 297_GSS_union_vt (void);
359 struct GNUNET_MESH_Tunnel *tunnel,
360 void **tunnel_ctx,
361 const struct GNUNET_MessageHeader *mh);
362 298
363 299
364#endif 300#endif
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index ed29d5d27..3725de807 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -61,7 +61,7 @@
61 61
62 62
63/** 63/**
64 * Current phase we are in for a union operation 64 * Current phase we are in for a union operation.
65 */ 65 */
66enum UnionOperationPhase 66enum UnionOperationPhase
67{ 67{
@@ -100,7 +100,7 @@ enum UnionOperationPhase
100 * State of an evaluate operation 100 * State of an evaluate operation
101 * with another peer. 101 * with another peer.
102 */ 102 */
103struct UnionEvaluateOperation 103struct OperationState
104{ 104{
105 /** 105 /**
106 * Tunnel to the remote peer. 106 * Tunnel to the remote peer.
@@ -154,23 +154,29 @@ struct UnionEvaluateOperation
154 * was created. 154 * was created.
155 */ 155 */
156 unsigned int generation_created; 156 unsigned int generation_created;
157
158 /**
159 * Set state of the set that this operation
160 * belongs to.
161 */
162 struct SetState *set_state;
157 163
158 /** 164 /**
159 * Evaluate operations are held in 165 * Evaluate operations are held in
160 * a linked list. 166 * a linked list.
161 */ 167 */
162 struct UnionEvaluateOperation *next; 168 struct OperationState *next;
163 169
164 /** 170 /**
165 * Evaluate operations are held in 171 * Evaluate operations are held in
166 * a linked list. 172 * a linked list.
167 */ 173 */
168 struct UnionEvaluateOperation *prev; 174 struct OperationState *prev;
169}; 175};
170 176
171 177
172/** 178/**
173 * Information about the element in a set. 179 * Information about an element element in the set.
174 * All elements are stored in a hash-table 180 * All elements are stored in a hash-table
175 * from their hash-code to their 'struct Element', 181 * from their hash-code to their 'struct Element',
176 * so that the remove and add operations are reasonably 182 * so that the remove and add operations are reasonably
@@ -218,7 +224,8 @@ struct ElementEntry
218 224
219 225
220/** 226/**
221 * Entries in the key-to-element map of the union set. 227 * The key entry is used to associate an ibf key with
228 * an element.
222 */ 229 */
223struct KeyEntry 230struct KeyEntry
224{ 231{
@@ -239,6 +246,7 @@ struct KeyEntry
239 struct KeyEntry *next_colliding; 246 struct KeyEntry *next_colliding;
240}; 247};
241 248
249
242/** 250/**
243 * Used as a closure for sending elements 251 * Used as a closure for sending elements
244 * with a specific IBF key. 252 * with a specific IBF key.
@@ -255,14 +263,14 @@ struct SendElementClosure
255 * Operation for which the elements 263 * Operation for which the elements
256 * should be sent. 264 * should be sent.
257 */ 265 */
258 struct UnionEvaluateOperation *eo; 266 struct OperationState *eo;
259}; 267};
260 268
261 269
262/** 270/**
263 * Extra state required for efficient set union. 271 * Extra state required for efficient set union.
264 */ 272 */
265struct UnionState 273struct SetState
266{ 274{
267 /** 275 /**
268 * The strata estimator is only generated once for 276 * The strata estimator is only generated once for
@@ -281,13 +289,13 @@ struct UnionState
281 * Evaluate operations are held in 289 * Evaluate operations are held in
282 * a linked list. 290 * a linked list.
283 */ 291 */
284 struct UnionEvaluateOperation *ops_head; 292 struct OperationState *ops_head;
285 293
286 /** 294 /**
287 * Evaluate operations are held in 295 * Evaluate operations are held in
288 * a linked list. 296 * a linked list.
289 */ 297 */
290 struct UnionEvaluateOperation *ops_tail; 298 struct OperationState *ops_tail;
291 299
292 /** 300 /**
293 * Current generation, that is, number of 301 * Current generation, that is, number of
@@ -321,23 +329,6 @@ destroy_elements_iterator (void *cls,
321 329
322 330
323/** 331/**
324 * Destroy the elements belonging to a union set.
325 *
326 * @param us union state that contains the elements
327 */
328static void
329destroy_elements (struct UnionState *us)
330{
331 if (NULL == us->elements)
332 return;
333 GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
334 GNUNET_CONTAINER_multihashmap_destroy (us->elements);
335 us->elements = NULL;
336}
337
338
339
340/**
341 * Iterator over hash map entries. 332 * Iterator over hash map entries.
342 * 333 *
343 * @param cls closure 334 * @param cls closure
@@ -358,6 +349,11 @@ destroy_key_to_element_iter (void *cls,
358 { 349 {
359 struct KeyEntry *k_tmp = k; 350 struct KeyEntry *k_tmp = k;
360 k = k->next_colliding; 351 k = k->next_colliding;
352 if (GNUNET_YES == k_tmp->element->remote)
353 {
354 GNUNET_free (k_tmp->element);
355 k_tmp->element = NULL;
356 }
361 GNUNET_free (k_tmp); 357 GNUNET_free (k_tmp);
362 } 358 }
363 return GNUNET_YES; 359 return GNUNET_YES;
@@ -370,20 +366,24 @@ destroy_key_to_element_iter (void *cls,
370 * 366 *
371 * @param eo the union operation to destroy 367 * @param eo the union operation to destroy
372 */ 368 */
373void 369static void
374_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) 370union_operation_destroy (struct OperationState *eo)
375{ 371{
376 struct UnionState *st = eo->spec->set->state.u; 372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
377 373 GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head,
378 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); 374 eo->set_state->ops_tail,
379 375 eo);
376 if (NULL != eo->mq)
377 {
378 GNUNET_MQ_destroy (eo->mq);
379 eo->mq = NULL;
380 }
380 if (NULL != eo->tunnel) 381 if (NULL != eo->tunnel)
381 { 382 {
382 struct GNUNET_MESH_Tunnel *t = eo->tunnel; 383 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
383 eo->tunnel = NULL; 384 eo->tunnel = NULL;
384 GNUNET_MESH_tunnel_destroy (t); 385 GNUNET_MESH_tunnel_destroy (t);
385 } 386 }
386
387 if (NULL != eo->remote_ibf) 387 if (NULL != eo->remote_ibf)
388 { 388 {
389 ibf_destroy (eo->remote_ibf); 389 ibf_destroy (eo->remote_ibf);
@@ -405,15 +405,19 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
405 GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); 405 GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
406 eo->key_to_element = NULL; 406 eo->key_to_element = NULL;
407 } 407 }
408 408 if (NULL != eo->spec)
409 GNUNET_CONTAINER_DLL_remove (st->ops_head, 409 {
410 st->ops_tail, 410 if (NULL != eo->spec->context_msg)
411 eo); 411 {
412 GNUNET_free (eo->spec->context_msg);
413 eo->spec->context_msg = NULL;
414 }
415 GNUNET_free (eo->spec);
416 eo->spec = NULL;
417 }
412 GNUNET_free (eo); 418 GNUNET_free (eo);
413 419
414 420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
415 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
416
417 421
418 /* FIXME: do a garbage collection of the set generations */ 422 /* FIXME: do a garbage collection of the set generations */
419} 423}
@@ -426,7 +430,7 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
426 * @param eo the union operation to fail 430 * @param eo the union operation to fail
427 */ 431 */
428static void 432static void
429fail_union_operation (struct UnionEvaluateOperation *eo) 433fail_union_operation (struct OperationState *eo)
430{ 434{
431 struct GNUNET_MQ_Envelope *ev; 435 struct GNUNET_MQ_Envelope *ev;
432 struct GNUNET_SET_ResultMessage *msg; 436 struct GNUNET_SET_ResultMessage *msg;
@@ -434,8 +438,9 @@ fail_union_operation (struct UnionEvaluateOperation *eo)
434 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 438 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
435 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 439 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
436 msg->request_id = htonl (eo->spec->client_request_id); 440 msg->request_id = htonl (eo->spec->client_request_id);
441 msg->element_type = htons (0);
437 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 442 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
438 _GSS_union_operation_destroy (eo); 443 union_operation_destroy (eo);
439} 444}
440 445
441 446
@@ -467,13 +472,13 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
467 * @param eo operation with the other peer 472 * @param eo operation with the other peer
468 */ 473 */
469static void 474static void
470send_operation_request (struct UnionEvaluateOperation *eo) 475send_operation_request (struct OperationState *eo)
471{ 476{
472 struct GNUNET_MQ_Envelope *ev; 477 struct GNUNET_MQ_Envelope *ev;
473 struct OperationRequestMessage *msg; 478 struct OperationRequestMessage *msg;
474 479
475 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 480 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
476 eo->spec->context_msg); 481 eo->spec->context_msg);
477 482
478 if (NULL == ev) 483 if (NULL == ev)
479 { 484 {
@@ -484,6 +489,7 @@ send_operation_request (struct UnionEvaluateOperation *eo)
484 } 489 }
485 msg->operation = htons (GNUNET_SET_OPERATION_UNION); 490 msg->operation = htons (GNUNET_SET_OPERATION_UNION);
486 msg->app_id = eo->spec->app_id; 491 msg->app_id = eo->spec->app_id;
492 msg->salt = htonl (eo->spec->salt);
487 GNUNET_MQ_send (eo->mq, ev); 493 GNUNET_MQ_send (eo->mq, ev);
488 494
489 if (NULL != eo->spec->context_msg) 495 if (NULL != eo->spec->context_msg)
@@ -492,7 +498,7 @@ send_operation_request (struct UnionEvaluateOperation *eo)
492 eo->spec->context_msg = NULL; 498 eo->spec->context_msg = NULL;
493 } 499 }
494 500
495 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); 501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request\n");
496} 502}
497 503
498 504
@@ -532,13 +538,13 @@ insert_element_iterator (void *cls,
532 538
533/** 539/**
534 * Insert an element into the union operation's 540 * Insert an element into the union operation's
535 * key-to-element mapping 541 * key-to-element mapping. Takes ownership of 'ee'.
536 * 542 *
537 * @param eo the union operation 543 * @param eo the union operation
538 * @param ee the element entry 544 * @param ee the element entry
539 */ 545 */
540static void 546static void
541insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) 547insert_element (struct OperationState *eo, struct ElementEntry *ee)
542{ 548{
543 int ret; 549 int ret;
544 struct IBF_Key ibf_key; 550 struct IBF_Key ibf_key;
@@ -595,7 +601,7 @@ init_key_to_element_iterator (void *cls,
595 const struct GNUNET_HashCode *key, 601 const struct GNUNET_HashCode *key,
596 void *value) 602 void *value)
597{ 603{
598 struct UnionEvaluateOperation *eo = cls; 604 struct OperationState *eo = cls;
599 struct ElementEntry *e = value; 605 struct ElementEntry *e = value;
600 606
601 /* make sure that the element belongs to the set at the time 607 /* make sure that the element belongs to the set at the time
@@ -605,6 +611,8 @@ init_key_to_element_iterator (void *cls,
605 (e->generation_removed < eo->generation_created))) 611 (e->generation_removed < eo->generation_created)))
606 return GNUNET_YES; 612 return GNUNET_YES;
607 613
614 e->remote = GNUNET_NO;
615
608 insert_element (eo, e); 616 insert_element (eo, e);
609 return GNUNET_YES; 617 return GNUNET_YES;
610} 618}
@@ -618,15 +626,15 @@ init_key_to_element_iterator (void *cls,
618 * @param size size of the ibf to create 626 * @param size size of the ibf to create
619 */ 627 */
620static void 628static void
621prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) 629prepare_ibf (struct OperationState *eo, uint16_t size)
622{ 630{
623 if (NULL == eo->key_to_element) 631 if (NULL == eo->key_to_element)
624 { 632 {
625 unsigned int len; 633 unsigned int len;
626 len = GNUNET_CONTAINER_multihashmap_size (eo->spec->set->state.u->elements); 634 len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements);
627 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 635 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
628 GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements, 636 GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements,
629 init_key_to_element_iterator, eo); 637 init_key_to_element_iterator, eo);
630 } 638 }
631 if (NULL != eo->local_ibf) 639 if (NULL != eo->local_ibf)
632 ibf_destroy (eo->local_ibf); 640 ibf_destroy (eo->local_ibf);
@@ -643,14 +651,14 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
643 * @param ibf_order order of the ibf to send, size=2^order 651 * @param ibf_order order of the ibf to send, size=2^order
644 */ 652 */
645static void 653static void
646send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) 654send_ibf (struct OperationState *eo, uint16_t ibf_order)
647{ 655{
648 unsigned int buckets_sent = 0; 656 unsigned int buckets_sent = 0;
649 struct InvertibleBloomFilter *ibf; 657 struct InvertibleBloomFilter *ibf;
650 658
651 prepare_ibf (eo, 1<<ibf_order); 659 prepare_ibf (eo, 1<<ibf_order);
652 660
653 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order); 661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
654 662
655 ibf = eo->local_ibf; 663 ibf = eo->local_ibf;
656 664
@@ -667,11 +675,14 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
667 675
668 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, 676 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
669 GNUNET_MESSAGE_TYPE_SET_P2P_IBF); 677 GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
678 msg->reserved = 0;
670 msg->order = ibf_order; 679 msg->order = ibf_order;
671 msg->offset = htons (buckets_sent); 680 msg->offset = htons (buckets_sent);
672 ibf_write_slice (ibf, buckets_sent, 681 ibf_write_slice (ibf, buckets_sent,
673 buckets_in_message, &msg[1]); 682 buckets_in_message, &msg[1]);
674 buckets_sent += buckets_in_message; 683 buckets_sent += buckets_in_message;
684 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
685 buckets_in_message, buckets_sent, 1<<ibf_order);
675 GNUNET_MQ_send (eo->mq, ev); 686 GNUNET_MQ_send (eo->mq, ev);
676 } 687 }
677 688
@@ -685,18 +696,18 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
685 * @param eo the union operation with the remote peer 696 * @param eo the union operation with the remote peer
686 */ 697 */
687static void 698static void
688send_strata_estimator (struct UnionEvaluateOperation *eo) 699send_strata_estimator (struct OperationState *eo)
689{ 700{
690 struct GNUNET_MQ_Envelope *ev; 701 struct GNUNET_MQ_Envelope *ev;
691 struct GNUNET_MessageHeader *strata_msg; 702 struct GNUNET_MessageHeader *strata_msg;
692 struct UnionState *st = eo->spec->set->state.u;
693 703
694 ev = GNUNET_MQ_msg_header_extra (strata_msg, 704 ev = GNUNET_MQ_msg_header_extra (strata_msg,
695 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 705 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
696 GNUNET_MESSAGE_TYPE_SET_P2P_SE); 706 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
697 strata_estimator_write (st->se, &strata_msg[1]); 707 strata_estimator_write (eo->set_state->se, &strata_msg[1]);
698 GNUNET_MQ_send (eo->mq, ev); 708 GNUNET_MQ_send (eo->mq, ev);
699 eo->phase = PHASE_EXPECT_IBF; 709 eo->phase = PHASE_EXPECT_IBF;
710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
700} 711}
701 712
702 713
@@ -730,7 +741,7 @@ get_order_from_difference (unsigned int diff)
730static void 741static void
731handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) 742handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
732{ 743{
733 struct UnionEvaluateOperation *eo = cls; 744 struct OperationState *eo = cls;
734 struct StrataEstimator *remote_se; 745 struct StrataEstimator *remote_se;
735 int diff; 746 int diff;
736 747
@@ -746,7 +757,7 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
746 strata_estimator_read (&mh[1], remote_se); 757 strata_estimator_read (&mh[1], remote_se);
747 GNUNET_assert (NULL != eo->se); 758 GNUNET_assert (NULL != eo->se);
748 diff = strata_estimator_difference (remote_se, eo->se); 759 diff = strata_estimator_difference (remote_se, eo->se);
749 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff); 760 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, diff=%d\n", diff);
750 strata_estimator_destroy (remote_se); 761 strata_estimator_destroy (remote_se);
751 strata_estimator_destroy (eo->se); 762 strata_estimator_destroy (eo->se);
752 eo->se = NULL; 763 eo->se = NULL;
@@ -769,7 +780,7 @@ send_element_iterator (void *cls,
769{ 780{
770 struct SendElementClosure *sec = cls; 781 struct SendElementClosure *sec = cls;
771 struct IBF_Key ibf_key = sec->ibf_key; 782 struct IBF_Key ibf_key = sec->ibf_key;
772 struct UnionEvaluateOperation *eo = sec->eo; 783 struct OperationState *eo = sec->eo;
773 struct KeyEntry *ke = value; 784 struct KeyEntry *ke = value;
774 785
775 if (ke->ibf_key.key_val != ibf_key.key_val) 786 if (ke->ibf_key.key_val != ibf_key.key_val)
@@ -789,7 +800,7 @@ send_element_iterator (void *cls,
789 continue; 800 continue;
790 } 801 }
791 memcpy (&mh[1], element->data, element->size); 802 memcpy (&mh[1], element->data, element->size);
792 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); 803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element to client\n");
793 GNUNET_MQ_send (eo->mq, ev); 804 GNUNET_MQ_send (eo->mq, ev);
794 ke = ke->next_colliding; 805 ke = ke->next_colliding;
795 } 806 }
@@ -804,7 +815,7 @@ send_element_iterator (void *cls,
804 * @param ibf_key IBF key of interest 815 * @param ibf_key IBF key of interest
805 */ 816 */
806static void 817static void
807send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key) 818send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
808{ 819{
809 struct SendElementClosure send_cls; 820 struct SendElementClosure send_cls;
810 821
@@ -822,7 +833,7 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key
822 * @param eo union operation 833 * @param eo union operation
823 */ 834 */
824static void 835static void
825decode_and_send (struct UnionEvaluateOperation *eo) 836decode_and_send (struct OperationState *eo)
826{ 837{
827 struct IBF_Key key; 838 struct IBF_Key key;
828 int side; 839 int side;
@@ -833,6 +844,9 @@ decode_and_send (struct UnionEvaluateOperation *eo)
833 prepare_ibf (eo, eo->remote_ibf->size); 844 prepare_ibf (eo, eo->remote_ibf->size);
834 diff_ibf = ibf_dup (eo->local_ibf); 845 diff_ibf = ibf_dup (eo->local_ibf);
835 ibf_subtract (diff_ibf, eo->remote_ibf); 846 ibf_subtract (diff_ibf, eo->remote_ibf);
847
848 ibf_destroy (eo->remote_ibf);
849 eo->remote_ibf = NULL;
836 850
837 while (1) 851 while (1)
838 { 852 {
@@ -864,7 +878,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
864 { 878 {
865 struct GNUNET_MQ_Envelope *ev; 879 struct GNUNET_MQ_Envelope *ev;
866 880
867 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); 881 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
868 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 882 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
869 GNUNET_MQ_send (eo->mq, ev); 883 GNUNET_MQ_send (eo->mq, ev);
870 break; 884 break;
@@ -899,7 +913,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
899static void 913static void
900handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) 914handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
901{ 915{
902 struct UnionEvaluateOperation *eo = cls; 916 struct OperationState *eo = cls;
903 struct IBFMessage *msg = (struct IBFMessage *) mh; 917 struct IBFMessage *msg = (struct IBFMessage *) mh;
904 unsigned int buckets_in_message; 918 unsigned int buckets_in_message;
905 919
@@ -908,8 +922,9 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
908 { 922 {
909 eo->phase = PHASE_EXPECT_IBF_CONT; 923 eo->phase = PHASE_EXPECT_IBF_CONT;
910 GNUNET_assert (NULL == eo->remote_ibf); 924 GNUNET_assert (NULL == eo->remote_ibf);
911 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order); 925 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
912 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 926 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
927 eo->ibf_buckets_received = 0;
913 if (0 != ntohs (msg->offset)) 928 if (0 != ntohs (msg->offset))
914 { 929 {
915 GNUNET_break (0); 930 GNUNET_break (0);
@@ -929,6 +944,13 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
929 944
930 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 945 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
931 946
947 if (0 == buckets_in_message)
948 {
949 GNUNET_break_op (0);
950 fail_union_operation (eo);
951 return;
952 }
953
932 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) 954 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
933 { 955 {
934 GNUNET_break (0); 956 GNUNET_break (0);
@@ -942,7 +964,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
942 if (eo->ibf_buckets_received == eo->remote_ibf->size) 964 if (eo->ibf_buckets_received == eo->remote_ibf->size)
943 { 965 {
944 966
945 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n"); 967 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
946 eo->phase = PHASE_EXPECT_ELEMENTS; 968 eo->phase = PHASE_EXPECT_ELEMENTS;
947 decode_and_send (eo); 969 decode_and_send (eo);
948 } 970 }
@@ -957,12 +979,13 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
957 * @param element element to send 979 * @param element element to send
958 */ 980 */
959static void 981static void
960send_client_element (struct UnionEvaluateOperation *eo, 982send_client_element (struct OperationState *eo,
961 struct GNUNET_SET_Element *element) 983 struct GNUNET_SET_Element *element)
962{ 984{
963 struct GNUNET_MQ_Envelope *ev; 985 struct GNUNET_MQ_Envelope *ev;
964 struct GNUNET_SET_ResultMessage *rm; 986 struct GNUNET_SET_ResultMessage *rm;
965 987
988 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending el of size %u\n", element->size);
966 GNUNET_assert (0 != eo->spec->client_request_id); 989 GNUNET_assert (0 != eo->spec->client_request_id);
967 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 990 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
968 if (NULL == ev) 991 if (NULL == ev)
@@ -973,6 +996,7 @@ send_client_element (struct UnionEvaluateOperation *eo,
973 } 996 }
974 rm->result_status = htons (GNUNET_SET_STATUS_OK); 997 rm->result_status = htons (GNUNET_SET_STATUS_OK);
975 rm->request_id = htonl (eo->spec->client_request_id); 998 rm->request_id = htonl (eo->spec->client_request_id);
999 rm->element_type = element->type;
976 memcpy (&rm[1], element->data, element->size); 1000 memcpy (&rm[1], element->data, element->size);
977 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 1001 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
978} 1002}
@@ -987,7 +1011,7 @@ send_client_element (struct UnionEvaluateOperation *eo,
987 * @param eo union operation 1011 * @param eo union operation
988 */ 1012 */
989static void 1013static void
990send_client_done_and_destroy (struct UnionEvaluateOperation *eo) 1014send_client_done_and_destroy (struct OperationState *eo)
991{ 1015{
992 struct GNUNET_MQ_Envelope *ev; 1016 struct GNUNET_MQ_Envelope *ev;
993 struct GNUNET_SET_ResultMessage *rm; 1017 struct GNUNET_SET_ResultMessage *rm;
@@ -995,6 +1019,7 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
995 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 1019 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
996 rm->request_id = htonl (eo->spec->client_request_id); 1020 rm->request_id = htonl (eo->spec->client_request_id);
997 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1021 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1022 rm->element_type = htons (0);
998 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 1023 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
999 1024
1000} 1025}
@@ -1009,11 +1034,11 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1009static void 1034static void
1010handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) 1035handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1011{ 1036{
1012 struct UnionEvaluateOperation *eo = cls; 1037 struct OperationState *eo = cls;
1013 struct ElementEntry *ee; 1038 struct ElementEntry *ee;
1014 uint16_t element_size; 1039 uint16_t element_size;
1015 1040
1016 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n"); 1041 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1017 1042
1018 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && 1043 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1019 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) 1044 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
@@ -1025,13 +1050,12 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1025 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); 1050 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1026 ee = GNUNET_malloc (sizeof *eo + element_size); 1051 ee = GNUNET_malloc (sizeof *eo + element_size);
1027 memcpy (&ee[1], &mh[1], element_size); 1052 memcpy (&ee[1], &mh[1], element_size);
1053 ee->element.size = element_size;
1028 ee->element.data = &ee[1]; 1054 ee->element.data = &ee[1];
1029 ee->remote = GNUNET_YES; 1055 ee->remote = GNUNET_YES;
1030 1056
1031 insert_element (eo, ee); 1057 insert_element (eo, ee);
1032 send_client_element (eo, &ee->element); 1058 send_client_element (eo, &ee->element);
1033
1034 GNUNET_free (ee);
1035} 1059}
1036 1060
1037 1061
@@ -1044,7 +1068,7 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1044static void 1068static void
1045handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) 1069handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1046{ 1070{
1047 struct UnionEvaluateOperation *eo = cls; 1071 struct OperationState *eo = cls;
1048 struct IBF_Key *ibf_key; 1072 struct IBF_Key *ibf_key;
1049 unsigned int num_keys; 1073 unsigned int num_keys;
1050 1074
@@ -1082,7 +1106,7 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1082static void 1106static void
1083peer_done_sent_cb (void *cls) 1107peer_done_sent_cb (void *cls)
1084{ 1108{
1085 struct UnionEvaluateOperation *eo = cls; 1109 struct OperationState *eo = cls;
1086 1110
1087 send_client_done_and_destroy (eo); 1111 send_client_done_and_destroy (eo);
1088} 1112}
@@ -1097,14 +1121,14 @@ peer_done_sent_cb (void *cls)
1097static void 1121static void
1098handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) 1122handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1099{ 1123{
1100 struct UnionEvaluateOperation *eo = cls; 1124 struct OperationState *eo = cls;
1101 1125
1102 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1126 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1103 { 1127 {
1104 /* we got all requests, but still have to send our elements as response */ 1128 /* we got all requests, but still have to send our elements as response */
1105 struct GNUNET_MQ_Envelope *ev; 1129 struct GNUNET_MQ_Envelope *ev;
1106 1130
1107 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); 1131 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1108 eo->phase = PHASE_FINISHED; 1132 eo->phase = PHASE_FINISHED;
1109 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1133 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1110 GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo); 1134 GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
@@ -1113,7 +1137,7 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1113 } 1137 }
1114 if (eo->phase == PHASE_EXPECT_ELEMENTS) 1138 if (eo->phase == PHASE_EXPECT_ELEMENTS)
1115 { 1139 {
1116 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n"); 1140 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1117 eo->phase = PHASE_FINISHED; 1141 eo->phase = PHASE_FINISHED;
1118 send_client_done_and_destroy (eo); 1142 send_client_done_and_destroy (eo);
1119 return; 1143 return;
@@ -1129,34 +1153,38 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1129 * 1153 *
1130 * @param spec specification of the operation the evaluate 1154 * @param spec specification of the operation the evaluate
1131 * @param tunnel tunnel already connected to the partner peer 1155 * @param tunnel tunnel already connected to the partner peer
1156 * @param tc tunnel context, passed here so all new incoming
1157 * messages are directly going to the union operations
1132 * @return a handle to the operation 1158 * @return a handle to the operation
1133 */ 1159 */
1134struct UnionEvaluateOperation * 1160static void
1135_GSS_union_evaluate (struct OperationSpecification *spec, 1161union_evaluate (struct OperationSpecification *spec,
1136 struct GNUNET_MESH_Tunnel *tunnel) 1162 struct GNUNET_MESH_Tunnel *tunnel,
1163 struct TunnelContext *tc)
1137{ 1164{
1138 struct UnionEvaluateOperation *eo; 1165 struct OperationState *eo;
1139 struct UnionState *st = spec->set->state.u;
1140 1166
1141 eo = GNUNET_new (struct UnionEvaluateOperation); 1167 eo = GNUNET_new (struct OperationState);
1142 eo->se = strata_estimator_dup (spec->set->state.u->se); 1168 tc->vt = _GSS_union_vt ();
1169 tc->op = eo;
1170 eo->se = strata_estimator_dup (spec->set->state->se);
1171 eo->set_state = spec->set->state;
1143 eo->spec = spec; 1172 eo->spec = spec;
1144 eo->tunnel = tunnel; 1173 eo->tunnel = tunnel;
1174 eo->mq = GNUNET_MESH_mq_create (tunnel);
1145 1175
1146 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147 "evaluating union operation, (app %s)\n", 1177 "evaluating union operation, (app %s)\n",
1148 GNUNET_h2s (&eo->spec->app_id)); 1178 GNUNET_h2s (&eo->spec->app_id));
1149 1179
1150 /* we started the operation, thus we have to send the operation request */ 1180 /* we started the operation, thus we have to send the operation request */
1151 eo->phase = PHASE_EXPECT_SE; 1181 eo->phase = PHASE_EXPECT_SE;
1152 1182
1153 GNUNET_CONTAINER_DLL_insert (st->ops_head, 1183 GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1154 st->ops_tail, 1184 eo->set_state->ops_tail,
1155 eo); 1185 eo);
1156 1186
1157 send_operation_request (eo); 1187 send_operation_request (eo);
1158
1159 return eo;
1160} 1188}
1161 1189
1162 1190
@@ -1165,30 +1193,34 @@ _GSS_union_evaluate (struct OperationSpecification *spec,
1165 * 1193 *
1166 * @param spec all necessary information about the operation 1194 * @param spec all necessary information about the operation
1167 * @param tunnel open tunnel to the partner's peer 1195 * @param tunnel open tunnel to the partner's peer
1196 * @param tc tunnel context, passed here so all new incoming
1197 * messages are directly going to the union operations
1168 * @return operation 1198 * @return operation
1169 */ 1199 */
1170struct UnionEvaluateOperation * 1200static void
1171_GSS_union_accept (struct OperationSpecification *spec, 1201union_accept (struct OperationSpecification *spec,
1172 struct GNUNET_MESH_Tunnel *tunnel) 1202 struct GNUNET_MESH_Tunnel *tunnel,
1203 struct TunnelContext *tc)
1173{ 1204{
1174 struct UnionEvaluateOperation *eo; 1205 struct OperationState *eo;
1175 struct UnionState *st = spec->set->state.u;
1176 1206
1177 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); 1207 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1178 1208
1179 eo = GNUNET_new (struct UnionEvaluateOperation); 1209 eo = GNUNET_new (struct OperationState);
1180 eo->generation_created = st->current_generation++; 1210 tc->vt = _GSS_union_vt ();
1211 tc->op = eo;
1212 eo->set_state = spec->set->state;
1213 eo->generation_created = eo->set_state->current_generation++;
1181 eo->spec = spec; 1214 eo->spec = spec;
1182 eo->tunnel = tunnel; 1215 eo->tunnel = tunnel;
1183 eo->se = strata_estimator_dup (st->se); 1216 eo->mq = GNUNET_MESH_mq_create (tunnel);
1217 eo->se = strata_estimator_dup (eo->set_state->se);
1184 /* transfer ownership of mq and socket from incoming to eo */ 1218 /* transfer ownership of mq and socket from incoming to eo */
1185 GNUNET_CONTAINER_DLL_insert (st->ops_head, 1219 GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1186 st->ops_tail, 1220 eo->set_state->ops_tail,
1187 eo); 1221 eo);
1188 /* kick off the operation */ 1222 /* kick off the operation */
1189 send_strata_estimator (eo); 1223 send_strata_estimator (eo);
1190
1191 return eo;
1192} 1224}
1193 1225
1194 1226
@@ -1197,111 +1229,101 @@ _GSS_union_accept (struct OperationSpecification *spec,
1197 * 1229 *
1198 * @return the newly created set 1230 * @return the newly created set
1199 */ 1231 */
1200struct Set * 1232static struct SetState *
1201_GSS_union_set_create (void) 1233union_set_create (void)
1202{ 1234{
1203 struct Set *set; 1235 struct SetState *set_state;
1204 1236
1205 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n"); 1237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1206 1238
1207 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); 1239 set_state = GNUNET_new (struct SetState);
1208 set->state.u = (struct UnionState *) &set[1];
1209 set->operation = GNUNET_SET_OPERATION_UNION;
1210 /* keys of the hash map are stored in the element entrys, thus we do not 1240 /* keys of the hash map are stored in the element entrys, thus we do not
1211 * want the hash map to copy them */ 1241 * want the hash map to copy them */
1212 set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1242 set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1213 set->state.u->se = strata_estimator_create (SE_STRATA_COUNT, 1243 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1214 SE_IBF_SIZE, SE_IBF_HASH_NUM); 1244 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1215 return set; 1245 return set_state;
1216} 1246}
1217 1247
1218 1248
1219/** 1249/**
1220 * Add the element from the given element message to the set. 1250 * Add the element from the given element message to the set.
1221 * 1251 *
1222 * @param m message with the element 1252 * @param set_state state of the set want to add to
1223 * @param set set to add the element to 1253 * @param element the element to add to the set
1224 */ 1254 */
1225void 1255static void
1226_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set) 1256union_add (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1227{ 1257{
1228 struct ElementEntry *ee; 1258 struct ElementEntry *ee;
1229 struct ElementEntry *ee_dup; 1259 struct ElementEntry *ee_dup;
1230 uint16_t element_size;
1231 1260
1232 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n"); 1261 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n", element->size);
1233 1262
1234 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); 1263 ee = GNUNET_malloc (element->size + sizeof *ee);
1235 element_size = ntohs (m->header.size) - sizeof *m; 1264 ee->element.size = element->size;
1236 ee = GNUNET_malloc (element_size + sizeof *ee); 1265 memcpy (&ee[1], element->data, element->size);
1237 ee->element.size = element_size;
1238 memcpy (&ee[1], &m[1], element_size);
1239 ee->element.data = &ee[1]; 1266 ee->element.data = &ee[1];
1240 ee->generation_added = set->state.u->current_generation; 1267 ee->generation_added = set_state->current_generation;
1241 GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash); 1268 ee->remote = GNUNET_NO;
1242 ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash); 1269 GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash);
1270 ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements,
1271 &ee->element_hash);
1243 if (NULL != ee_dup) 1272 if (NULL != ee_dup)
1244 { 1273 {
1245 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n"); 1274 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1246 GNUNET_free (ee); 1275 GNUNET_free (ee);
1247 return; 1276 return;
1248 } 1277 }
1249 GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee, 1278 GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash, ee,
1250 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 1279 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1251 strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0)); 1280 strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1252} 1281}
1253 1282
1254 1283
1255/** 1284/**
1256 * Destroy a set that supports the union operation 1285 * Destroy a set that supports the union operation
1257 * 1286 *
1258 * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION 1287 * @param set_state the set to destroy
1259 */ 1288 */
1260void 1289static void
1261_GSS_union_set_destroy (struct Set *set) 1290union_set_destroy (struct SetState *set_state)
1262{ 1291{
1263 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); 1292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1264 if (NULL != set->client) 1293 /* important to destroy operations before the rest of the set */
1294 while (NULL != set_state->ops_head)
1295 union_operation_destroy (set_state->ops_head);
1296 if (NULL != set_state->se)
1265 { 1297 {
1266 GNUNET_SERVER_client_drop (set->client); 1298 strata_estimator_destroy (set_state->se);
1267 set->client = NULL; 1299 set_state->se = NULL;
1268 } 1300 }
1269 if (NULL != set->client_mq) 1301 if (NULL != set_state->elements)
1270 {
1271 GNUNET_MQ_destroy (set->client_mq);
1272 set->client_mq = NULL;
1273 }
1274
1275 if (NULL != set->state.u->se)
1276 { 1302 {
1277 strata_estimator_destroy (set->state.u->se); 1303 GNUNET_CONTAINER_multihashmap_iterate (set_state->elements,
1278 set->state.u->se = NULL; 1304 destroy_elements_iterator, NULL);
1305 GNUNET_CONTAINER_multihashmap_destroy (set_state->elements);
1306 set_state->elements = NULL;
1279 } 1307 }
1280 1308
1281 destroy_elements (set->state.u); 1309 GNUNET_free (set_state);
1282
1283 while (NULL != set->state.u->ops_head)
1284 {
1285 _GSS_union_operation_destroy (set->state.u->ops_head);
1286 }
1287} 1310}
1288 1311
1289/** 1312/**
1290 * Remove the element given in the element message from the set. 1313 * Remove the element given in the element message from the set.
1291 * Only marks the element as removed, so that older set operations can still exchange it. 1314 * Only marks the element as removed, so that older set operations can still exchange it.
1292 * 1315 *
1293 * @param m message with the element 1316 * @param set_state state of the set to remove from
1294 * @param set set to remove the element from 1317 * @param element set element to remove
1295 */ 1318 */
1296void 1319static void
1297_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) 1320union_remove (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1298{ 1321{
1299 struct GNUNET_HashCode hash; 1322 struct GNUNET_HashCode hash;
1300 struct ElementEntry *ee; 1323 struct ElementEntry *ee;
1301 1324
1302 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); 1325 GNUNET_CRYPTO_hash (element->data, element->size, &hash);
1303 GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash); 1326 ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash);
1304 ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1305 if (NULL == ee) 1327 if (NULL == ee)
1306 { 1328 {
1307 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n"); 1329 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
@@ -1313,36 +1335,22 @@ _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1313 return; 1335 return;
1314 } 1336 }
1315 ee->removed = GNUNET_YES; 1337 ee->removed = GNUNET_YES;
1316 ee->generation_removed = set->state.u->current_generation; 1338 ee->generation_removed = set_state->current_generation;
1317} 1339}
1318 1340
1319 1341
1320/** 1342/**
1321 * Dispatch messages for a union operation. 1343 * Dispatch messages for a union operation.
1322 * 1344 *
1323 * @param cls closure 1345 * @param eo the state of the union evaluate operation
1324 * @param tunnel mesh tunnel 1346 * @param mh the received message
1325 * @param tunnel_ctx tunnel context
1326 * @param mh message to process
1327 * @return GNUNET_SYSERR if the tunnel should be disconnected, 1347 * @return GNUNET_SYSERR if the tunnel should be disconnected,
1328 * GNUNET_OK otherwise 1348 * GNUNET_OK otherwise
1329 */ 1349 */
1330int 1350int
1331_GSS_union_handle_p2p_message (void *cls, 1351union_handle_p2p_message (struct OperationState *eo,
1332 struct GNUNET_MESH_Tunnel *tunnel, 1352 const struct GNUNET_MessageHeader *mh)
1333 void **tunnel_ctx,
1334 const struct GNUNET_MessageHeader *mh)
1335{ 1353{
1336 struct TunnelContext *tc = *tunnel_ctx;
1337 struct UnionEvaluateOperation *eo;
1338
1339 if (CONTEXT_OPERATION_UNION != tc->type)
1340 {
1341 return GNUNET_SYSERR;
1342 }
1343
1344 eo = tc->data.union_op;
1345
1346 switch (ntohs (mh->type)) 1354 switch (ntohs (mh->type))
1347 { 1355 {
1348 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: 1356 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
@@ -1366,3 +1374,60 @@ _GSS_union_handle_p2p_message (void *cls,
1366 } 1374 }
1367 return GNUNET_OK; 1375 return GNUNET_OK;
1368} 1376}
1377
1378
1379static void
1380union_peer_disconnect (struct OperationState *op)
1381{
1382 /* Are we already disconnected? */
1383 if (NULL == op->tunnel)
1384 return;
1385 op->tunnel = NULL;
1386 if (NULL != op->mq)
1387 {
1388 GNUNET_MQ_destroy (op->mq);
1389 op->mq = NULL;
1390 }
1391 if (PHASE_FINISHED != op->phase)
1392 {
1393 struct GNUNET_MQ_Envelope *ev;
1394 struct GNUNET_SET_ResultMessage *msg;
1395 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1396 msg->request_id = htonl (op->spec->client_request_id);
1397 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1398 msg->element_type = htons (0);
1399 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1400 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1401 }
1402 else
1403 {
1404 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1405 }
1406 union_operation_destroy (op);
1407}
1408
1409
1410static void
1411union_op_cancel (struct SetState *set_state, uint32_t op_id)
1412{
1413 /* FIXME: implement */
1414}
1415
1416
1417const struct SetVT *
1418_GSS_union_vt (void)
1419{
1420 static const struct SetVT union_vt = {
1421 .create = union_set_create,
1422 .msg_handler = union_handle_p2p_message,
1423 .add = union_add,
1424 .remove = union_remove,
1425 .destroy_set = union_set_destroy,
1426 .evaluate = union_evaluate,
1427 .accept = union_accept,
1428 .peer_disconnect = union_peer_disconnect,
1429 .cancel = union_op_cancel
1430 };
1431
1432 return &union_vt;
1433}
diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c
index 814c9e436..b0c81feb1 100644
--- a/src/set/gnunet-set-profiler.c
+++ b/src/set/gnunet-set-profiler.c
@@ -16,7 +16,7 @@
16 along with GNUnet; see the file COPYING. If not, write to the 16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19 */ 19*/
20 20
21/** 21/**
22 * @file set/gnunet-set-profiler.c 22 * @file set/gnunet-set-profiler.c
@@ -42,25 +42,17 @@ static char *op_str = "union";
42 42
43const static struct GNUNET_CONFIGURATION_Handle *config; 43const static struct GNUNET_CONFIGURATION_Handle *config;
44 44
45struct GNUNET_CONTAINER_MultiHashMap *map_a; 45struct SetInfo
46struct GNUNET_CONTAINER_MultiHashMap *map_b; 46{
47struct GNUNET_CONTAINER_MultiHashMap *map_c; 47 char *id;
48 48 struct GNUNET_SET_Handle *set;
49 49 struct GNUNET_SET_OperationHandle *oh;
50/** 50 struct GNUNET_CONTAINER_MultiHashMap *sent;
51 * Elements that set a received, should match map_c 51 struct GNUNET_CONTAINER_MultiHashMap *received;
52 * in the end. 52 int done;
53 */ 53} info1, info2;
54struct GNUNET_CONTAINER_MultiHashMap *map_a_received;
55
56/**
57 * Elements that set b received, should match map_c
58 * in the end.
59 */
60struct GNUNET_CONTAINER_MultiHashMap *map_b_received;
61 54
62struct GNUNET_SET_Handle *set_a; 55struct GNUNET_CONTAINER_MultiHashMap *common_sent;
63struct GNUNET_SET_Handle *set_b;
64 56
65struct GNUNET_HashCode app_id; 57struct GNUNET_HashCode app_id;
66 58
@@ -68,14 +60,6 @@ struct GNUNET_PeerIdentity local_peer;
68 60
69struct GNUNET_SET_ListenHandle *set_listener; 61struct GNUNET_SET_ListenHandle *set_listener;
70 62
71struct GNUNET_SET_OperationHandle *set_oh1;
72struct GNUNET_SET_OperationHandle *set_oh2;
73
74
75int a_done;
76int b_done;
77
78
79 63
80static int 64static int
81map_remove_iterator (void *cls, 65map_remove_iterator (void *cls,
@@ -85,66 +69,69 @@ map_remove_iterator (void *cls,
85 struct GNUNET_CONTAINER_MultiHashMap *m = cls; 69 struct GNUNET_CONTAINER_MultiHashMap *m = cls;
86 int ret; 70 int ret;
87 71
72 GNUNET_assert (NULL != key);
73
88 ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL); 74 ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL);
89 GNUNET_assert (GNUNET_OK == ret); 75 if (GNUNET_OK != ret)
76 printf ("spurious element\n");
90 return GNUNET_YES; 77 return GNUNET_YES;
91 78
92} 79}
93 80
94
95static void 81static void
96set_result_cb_1 (void *cls, 82check_all_done (void)
97 const struct GNUNET_SET_Element *element,
98 enum GNUNET_SET_Status status)
99{ 83{
100 GNUNET_assert (GNUNET_NO == a_done); 84 if (info1.done == GNUNET_NO || info2.done == GNUNET_NO)
101 GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode)); 85 return;
102 switch (status) 86
103 { 87 GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator, info2.sent);
104 case GNUNET_SET_STATUS_DONE: 88 GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator, info1.sent);
105 case GNUNET_SET_STATUS_HALF_DONE: 89
106 a_done = GNUNET_YES; 90 printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (info1.sent));
107 GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_a_received); 91 printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (info2.sent));
108 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received)); 92
109 return; 93 GNUNET_SCHEDULER_shutdown ();
110 case GNUNET_SET_STATUS_FAILURE:
111 GNUNET_assert (0);
112 return;
113 case GNUNET_SET_STATUS_OK:
114 break;
115 default:
116 GNUNET_assert (0);
117 }
118 GNUNET_CONTAINER_multihashmap_put (map_a_received,
119 element->data, NULL,
120 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
121} 94}
122 95
123 96
124static void 97static void
125set_result_cb_2 (void *cls, 98set_result_cb (void *cls,
126 const struct GNUNET_SET_Element *element, 99 const struct GNUNET_SET_Element *element,
127 enum GNUNET_SET_Status status) 100 enum GNUNET_SET_Status status)
128{ 101{
129 GNUNET_assert (GNUNET_NO == b_done); 102 struct SetInfo *info = cls;
130 GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode)); 103
104 GNUNET_assert (GNUNET_NO == info->done);
131 switch (status) 105 switch (status)
132 { 106 {
133 case GNUNET_SET_STATUS_DONE: 107 case GNUNET_SET_STATUS_DONE:
134 case GNUNET_SET_STATUS_HALF_DONE: 108 case GNUNET_SET_STATUS_HALF_DONE:
135 b_done = GNUNET_YES; 109 info->done = GNUNET_YES;
136 GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_b_received); 110 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id);
137 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received)); 111 check_all_done ();
112 info->oh = NULL;
138 return; 113 return;
139 case GNUNET_SET_STATUS_FAILURE: 114 case GNUNET_SET_STATUS_FAILURE:
140 GNUNET_assert (0); 115 info->oh = NULL;
116 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
117 GNUNET_SCHEDULER_shutdown ();
141 return; 118 return;
142 case GNUNET_SET_STATUS_OK: 119 case GNUNET_SET_STATUS_OK:
143 break; 120 break;
144 default: 121 default:
145 GNUNET_assert (0); 122 GNUNET_assert (0);
146 } 123 }
147 GNUNET_CONTAINER_multihashmap_put (map_b_received, 124
125 if (element->size != sizeof (struct GNUNET_HashCode))
126 {
127 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u\n", element->size);
128 GNUNET_assert (0);
129 }
130
131 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n",
132 info->id, GNUNET_h2s (element->data));
133 GNUNET_assert (NULL != element->data);
134 GNUNET_CONTAINER_multihashmap_put (info->received,
148 element->data, NULL, 135 element->data, NULL,
149 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 136 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
150} 137}
@@ -156,11 +143,16 @@ set_listen_cb (void *cls,
156 const struct GNUNET_MessageHeader *context_msg, 143 const struct GNUNET_MessageHeader *context_msg,
157 struct GNUNET_SET_Request *request) 144 struct GNUNET_SET_Request *request)
158{ 145{
159 GNUNET_assert (NULL == set_oh2); 146 if (NULL == request)
147 {
148 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "listener failed\n");
149 return;
150 }
151 GNUNET_assert (NULL == info2.oh);
160 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set listen cb called\n"); 152 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set listen cb called\n");
161 set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, 153 info2.oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
162 set_result_cb_2, NULL); 154 set_result_cb, &info2);
163 GNUNET_SET_commit (set_oh2, set_b); 155 GNUNET_SET_commit (info2.oh, info2.set);
164} 156}
165 157
166 158
@@ -185,6 +177,37 @@ set_insert_iterator (void *cls,
185 177
186 178
187static void 179static void
180handle_shutdown (void *cls,
181 const struct GNUNET_SCHEDULER_TaskContext *tc)
182{
183 if (NULL != set_listener)
184 {
185 GNUNET_SET_listen_cancel (set_listener);
186 set_listener = NULL;
187 }
188 if (NULL != info1.oh)
189 {
190 GNUNET_SET_operation_cancel (info1.oh);
191 info1.oh = NULL;
192 }
193 if (NULL != info2.oh)
194 {
195 GNUNET_SET_operation_cancel (info2.oh);
196 info2.oh = NULL;
197 }
198 if (NULL != info1.set)
199 {
200 GNUNET_SET_destroy (info1.set);
201 info1.set = NULL;
202 }
203 if (NULL != info2.set)
204 {
205 GNUNET_SET_destroy (info2.set);
206 info2.set = NULL;
207 }
208}
209
210static void
188run (void *cls, char *const *args, const char *cfgfile, 211run (void *cls, char *const *args, const char *cfgfile,
189 const struct GNUNET_CONFIGURATION_Handle *cfg) 212 const struct GNUNET_CONFIGURATION_Handle *cfg)
190{ 213{
@@ -195,63 +218,41 @@ run (void *cls, char *const *args, const char *cfgfile,
195 218
196 if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer)) 219 if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer))
197 { 220 {
198 GNUNET_assert (0); 221 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
222 ret = 0;
199 return; 223 return;
200 } 224 }
225
226 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, handle_shutdown, NULL);
227
228 info1.id = "a";
229 info2.id = "b";
201 230
202 map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO); 231 info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
203 map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO); 232 info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
204 map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO); 233 common_sent = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO);
234
235 info1.received = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
236 info2.received = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
205 237
206 for (i = 0; i < num_a; i++) 238 for (i = 0; i < num_a; i++)
207 { 239 {
208 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); 240 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
209 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) 241 GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL,
210 { 242 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
211 i--;
212 continue;
213 }
214 GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash,
215 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
216 } 243 }
217 244
218 for (i = 0; i < num_b; i++) 245 for (i = 0; i < num_b; i++)
219 { 246 {
220 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); 247 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
221 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) 248 GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL,
222 {
223 i--;
224 continue;
225 }
226 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
227 {
228 i--;
229 continue;
230 }
231 GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL,
232 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 249 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
233 } 250 }
234 251
235 for (i = 0; i < num_c; i++) 252 for (i = 0; i < num_c; i++)
236 { 253 {
237 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); 254 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
238 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) 255 GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL,
239 {
240 i--;
241 continue;
242 }
243 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
244 {
245 i--;
246 continue;
247 }
248 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash))
249 {
250 i--;
251 continue;
252 }
253 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
254 GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL,
255 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 256 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
256 } 257 }
257 258
@@ -259,20 +260,22 @@ run (void *cls, char *const *args, const char *cfgfile,
259 app_id = hash; 260 app_id = hash;
260 261
261 /* FIXME: also implement intersection etc. */ 262 /* FIXME: also implement intersection etc. */
262 set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); 263 info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
263 set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); 264 info2.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
264 265
265 GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a); 266 GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator, info1.set);
266 GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b); 267 GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator, info2.set);
267 GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a); 268 GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator, info1.set);
268 GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b); 269 GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator, info2.set);
269 270
270 set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, 271 set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
271 &app_id, set_listen_cb, NULL); 272 &app_id, set_listen_cb, NULL);
272 273
273 set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, GNUNET_SET_RESULT_ADDED, 274 info1.oh = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, GNUNET_SET_RESULT_ADDED,
274 set_result_cb_1, NULL); 275 set_result_cb, &info1);
275 GNUNET_SET_commit (set_oh1, set_a); 276 GNUNET_SET_commit (info1.oh, info1.set);
277 GNUNET_SET_destroy (info1.set);
278 info1.set = NULL;
276} 279}
277 280
278 281
diff --git a/src/set/set.h b/src/set/set.h
index 7ec3e6cb2..fc29e6696 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -194,6 +194,24 @@ struct GNUNET_SET_ElementMessage
194}; 194};
195 195
196 196
197/**
198 * Sent to the service by the client
199 * in order to cancel a set operation.
200 */
201struct GNUNET_SET_CancelMessage
202{
203 /**
204 * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL
205 */
206 struct GNUNET_MessageHeader header;
207
208 /**
209 * ID of the request we want to cancel.
210 */
211 uint32_t request_id GNUNET_PACKED;
212};
213
214
197GNUNET_NETWORK_STRUCT_END 215GNUNET_NETWORK_STRUCT_END
198 216
199#endif 217#endif
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 370846bae..d3131648b 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -33,7 +33,6 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
35 35
36
37/** 36/**
38 * Opaque handle to a set. 37 * Opaque handle to a set.
39 */ 38 */
@@ -42,20 +41,47 @@ struct GNUNET_SET_Handle
42 struct GNUNET_CLIENT_Connection *client; 41 struct GNUNET_CLIENT_Connection *client;
43 struct GNUNET_MQ_Handle *mq; 42 struct GNUNET_MQ_Handle *mq;
44 unsigned int messages_since_ack; 43 unsigned int messages_since_ack;
44 struct GNUNET_SET_OperationHandle *ops_head;
45 struct GNUNET_SET_OperationHandle *ops_tail;
46 int destroy_requested;
45}; 47};
46 48
49
47/** 50/**
48 * Opaque handle to a set operation request from another peer. 51 * Opaque handle to a set operation request from another peer.
49 */ 52 */
50struct GNUNET_SET_Request 53struct GNUNET_SET_Request
51{ 54{
55 /**
56 * Id of the request, used to identify the request when
57 * accepting/rejecting it.
58 */
52 uint32_t accept_id; 59 uint32_t accept_id;
60
61 /**
62 * Has the request been accepted already?
63 * GNUNET_YES/GNUNET_NO
64 */
53 int accepted; 65 int accepted;
54}; 66};
55 67
68
69/**
70 * Handle to an operation.
71 * Only known to the service after commiting
72 * the handle with a set.
73 */
56struct GNUNET_SET_OperationHandle 74struct GNUNET_SET_OperationHandle
57{ 75{
76 /**
77 * Function to be called when we have a result,
78 * or an error.
79 */
58 GNUNET_SET_ResultIterator result_cb; 80 GNUNET_SET_ResultIterator result_cb;
81
82 /**
83 * Closure for result_cb.
84 */
59 void *result_cls; 85 void *result_cls;
60 86
61 /** 87 /**
@@ -80,6 +106,17 @@ struct GNUNET_SET_OperationHandle
80 * used to patch the request id into the message when the set is known. 106 * used to patch the request id into the message when the set is known.
81 */ 107 */
82 uint32_t *request_id_addr; 108 uint32_t *request_id_addr;
109
110 /**
111 * Handles are kept in a linked list.
112 */
113 struct GNUNET_SET_OperationHandle *prev;
114
115 /**
116 * Handles are kept in a linked list.
117 */
118 struct GNUNET_SET_OperationHandle *next;
119
83}; 120};
84 121
85 122
@@ -88,9 +125,25 @@ struct GNUNET_SET_OperationHandle
88 */ 125 */
89struct GNUNET_SET_ListenHandle 126struct GNUNET_SET_ListenHandle
90{ 127{
128 /**
129 * Connection to the service.
130 */
91 struct GNUNET_CLIENT_Connection *client; 131 struct GNUNET_CLIENT_Connection *client;
132
133 /**
134 * Message queue for the client.
135 */
92 struct GNUNET_MQ_Handle* mq; 136 struct GNUNET_MQ_Handle* mq;
137
138 /**
139 * Function to call on a new incoming request,
140 * or on error.
141 */
93 GNUNET_SET_ListenCallback listen_cb; 142 GNUNET_SET_ListenCallback listen_cb;
143
144 /**
145 * Closure for listen_cb.
146 */
94 void *listen_cls; 147 void *listen_cls;
95}; 148};
96 149
@@ -108,11 +161,13 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
108 struct GNUNET_SET_Handle *set = cls; 161 struct GNUNET_SET_Handle *set = cls;
109 struct GNUNET_SET_OperationHandle *oh; 162 struct GNUNET_SET_OperationHandle *oh;
110 struct GNUNET_SET_Element e; 163 struct GNUNET_SET_Element e;
111 164 enum GNUNET_SET_Status result_status;
112 165
113 GNUNET_assert (NULL != set); 166 GNUNET_assert (NULL != set);
114 GNUNET_assert (NULL != set->mq); 167 GNUNET_assert (NULL != set->mq);
115 168
169 result_status = ntohs (msg->result_status);
170
116 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) 171 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
117 { 172 {
118 struct GNUNET_MQ_Envelope *mqm; 173 struct GNUNET_MQ_Envelope *mqm;
@@ -123,11 +178,14 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
123 GNUNET_assert (NULL != oh); 178 GNUNET_assert (NULL != oh);
124 /* status is not STATUS_OK => there's no attached element, 179 /* status is not STATUS_OK => there's no attached element,
125 * and this is the last result message we get */ 180 * and this is the last result message we get */
126 if (htons (msg->result_status) != GNUNET_SET_STATUS_OK) 181 if (GNUNET_SET_STATUS_OK != result_status)
127 { 182 {
128 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); 183 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
184 GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
185 if (GNUNET_YES == oh->set->destroy_requested)
186 GNUNET_SET_destroy (oh->set);
129 if (NULL != oh->result_cb) 187 if (NULL != oh->result_cb)
130 oh->result_cb (oh->result_cls, NULL, htons (msg->result_status)); 188 oh->result_cb (oh->result_cls, NULL, result_status);
131 GNUNET_free (oh); 189 GNUNET_free (oh);
132 return; 190 return;
133 } 191 }
@@ -136,7 +194,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
136 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); 194 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
137 e.type = msg->element_type; 195 e.type = msg->element_type;
138 if (NULL != oh->result_cb) 196 if (NULL != oh->result_cb)
139 oh->result_cb (oh->result_cls, &e, htons (msg->result_status)); 197 oh->result_cb (oh->result_cls, &e, result_status);
140} 198}
141 199
142/** 200/**
@@ -153,7 +211,7 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
153 struct GNUNET_SET_Request *req; 211 struct GNUNET_SET_Request *req;
154 struct GNUNET_MessageHeader *context_msg; 212 struct GNUNET_MessageHeader *context_msg;
155 213
156 LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n"); 214 LOG (GNUNET_ERROR_TYPE_DEBUG, "processing operation request\n");
157 req = GNUNET_new (struct GNUNET_SET_Request); 215 req = GNUNET_new (struct GNUNET_SET_Request);
158 req->accept_id = ntohl (msg->accept_id); 216 req->accept_id = ntohl (msg->accept_id);
159 context_msg = GNUNET_MQ_extract_nested_mh (msg); 217 context_msg = GNUNET_MQ_extract_nested_mh (msg);
@@ -171,16 +229,42 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
171 amsg->accept_reject_id = msg->accept_id; 229 amsg->accept_reject_id = msg->accept_id;
172 GNUNET_MQ_send (lh->mq, mqm); 230 GNUNET_MQ_send (lh->mq, mqm);
173 GNUNET_free (req); 231 GNUNET_free (req);
174 LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n"); 232 LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n");
175 } 233 }
176 234
177 LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n"); 235 LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n");
178 236
179 /* the accept-case is handled in GNUNET_SET_accept, 237 /* the accept-case is handled in GNUNET_SET_accept,
180 * as we have the accept message available there */ 238 * as we have the accept message available there */
181} 239}
182 240
183 241
242static void
243handle_client_listener_error (void *cls, enum GNUNET_MQ_Error error)
244{
245 struct GNUNET_SET_ListenHandle *lh = cls;
246
247 lh->listen_cb (lh->listen_cls, NULL, NULL, NULL);
248}
249
250
251static void
252handle_client_set_error (void *cls, enum GNUNET_MQ_Error error)
253{
254 struct GNUNET_SET_Handle *set = cls;
255
256 while (NULL != set->ops_head)
257 {
258 if (NULL != set->ops_head->result_cb)
259 set->ops_head->result_cb (set->ops_head->result_cls, NULL,
260 GNUNET_SET_STATUS_FAILURE);
261 GNUNET_SET_operation_cancel (set->ops_head);
262 }
263
264 /* FIXME: there should be a set error handler */
265}
266
267
184/** 268/**
185 * Create an empty set, supporting the specified operation. 269 * Create an empty set, supporting the specified operation.
186 * 270 *
@@ -200,15 +284,16 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
200 struct GNUNET_MQ_Envelope *mqm; 284 struct GNUNET_MQ_Envelope *mqm;
201 struct GNUNET_SET_CreateMessage *msg; 285 struct GNUNET_SET_CreateMessage *msg;
202 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { 286 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
203 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, 287 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
204 GNUNET_MQ_HANDLERS_END 288 GNUNET_MQ_HANDLERS_END
205 }; 289 };
206 290
207 set = GNUNET_new (struct GNUNET_SET_Handle); 291 set = GNUNET_new (struct GNUNET_SET_Handle);
208 set->client = GNUNET_CLIENT_connect ("set", cfg); 292 set->client = GNUNET_CLIENT_connect ("set", cfg);
209 LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n"); 293 LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n");
210 GNUNET_assert (NULL != set->client); 294 GNUNET_assert (NULL != set->client);
211 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); 295 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers,
296 handle_client_set_error, set);
212 GNUNET_assert (NULL != set->mq); 297 GNUNET_assert (NULL != set->mq);
213 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); 298 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
214 msg->operation = htons (op); 299 msg->operation = htons (op);
@@ -279,6 +364,11 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
279void 364void
280GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) 365GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
281{ 366{
367 if (NULL != set->ops_head)
368 {
369 set->destroy_requested = GNUNET_YES;
370 return;
371 }
282 GNUNET_CLIENT_disconnect (set->client); 372 GNUNET_CLIENT_disconnect (set->client);
283 set->client = NULL; 373 set->client = NULL;
284 GNUNET_MQ_destroy (set->mq); 374 GNUNET_MQ_destroy (set->mq);
@@ -332,7 +422,6 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
332 return oh; 422 return oh;
333} 423}
334 424
335
336/** 425/**
337 * Wait for set operation requests for the given application id 426 * Wait for set operation requests for the given application id
338 * 427 *
@@ -365,7 +454,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
365 lh->listen_cb = listen_cb; 454 lh->listen_cb = listen_cb;
366 lh->listen_cls = listen_cls; 455 lh->listen_cls = listen_cls;
367 GNUNET_assert (NULL != lh->client); 456 GNUNET_assert (NULL != lh->client);
368 lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, lh); 457 lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
458 handle_client_listener_error, lh);
369 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); 459 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
370 msg->operation = htons (operation); 460 msg->operation = htons (operation);
371 msg->app_id = *app_id; 461 msg->app_id = *app_id;
@@ -413,6 +503,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
413 struct GNUNET_SET_OperationHandle *oh; 503 struct GNUNET_SET_OperationHandle *oh;
414 struct GNUNET_SET_AcceptRejectMessage *msg; 504 struct GNUNET_SET_AcceptRejectMessage *msg;
415 505
506 GNUNET_assert (NULL != request);
416 GNUNET_assert (GNUNET_NO == request->accepted); 507 GNUNET_assert (GNUNET_NO == request->accepted);
417 request->accepted = GNUNET_YES; 508 request->accepted = GNUNET_YES;
418 509
@@ -432,6 +523,9 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
432 523
433/** 524/**
434 * Cancel the given set operation. 525 * Cancel the given set operation.
526 * We need to send an explicit cancel message, as
527 * all operations communicate with the set's client
528 * handle.
435 * 529 *
436 * @param oh set operation to cancel 530 * @param oh set operation to cancel
437 */ 531 */
@@ -441,17 +535,20 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
441 struct GNUNET_MQ_Envelope *mqm; 535 struct GNUNET_MQ_Envelope *mqm;
442 struct GNUNET_SET_OperationHandle *h_assoc; 536 struct GNUNET_SET_OperationHandle *h_assoc;
443 537
444 if (NULL != oh->set) 538 GNUNET_assert (NULL != oh->set);
445 { 539
446 h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); 540 GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
447 GNUNET_assert (h_assoc == oh); 541 h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
448 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); 542 GNUNET_assert (h_assoc == oh);
449 GNUNET_MQ_send (oh->set->mq, mqm); 543 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
450 } 544 GNUNET_MQ_send (oh->set->mq, mqm);
451 545
452 if (NULL != oh->conclude_mqm) 546 if (NULL != oh->conclude_mqm)
453 GNUNET_MQ_discard (oh->conclude_mqm); 547 GNUNET_MQ_discard (oh->conclude_mqm);
454 548
549 if (GNUNET_YES == oh->set->destroy_requested)
550 GNUNET_SET_destroy (oh->set);
551
455 GNUNET_free (oh); 552 GNUNET_free (oh);
456} 553}
457 554
@@ -469,14 +566,15 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
469 */ 566 */
470void 567void
471GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, 568GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
472 struct GNUNET_SET_Handle *set) 569 struct GNUNET_SET_Handle *set)
473{ 570{
474 GNUNET_assert (NULL == oh->set); 571 GNUNET_assert (NULL == oh->set);
475 GNUNET_assert (NULL != oh->conclude_mqm); 572 GNUNET_assert (NULL != oh->conclude_mqm);
476 oh->set = set; 573 oh->set = set;
477 oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh); 574 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, oh);
575 oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
478 *oh->request_id_addr = htonl (oh->request_id); 576 *oh->request_id_addr = htonl (oh->request_id);
479 GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm); 577 GNUNET_MQ_send (set->mq, oh->conclude_mqm);
480 oh->conclude_mqm = NULL; 578 oh->conclude_mqm = NULL;
481 oh->request_id_addr = NULL; 579 oh->request_id_addr = NULL;
482} 580}
diff --git a/src/util/mq.c b/src/util/mq.c
index f318dd04a..04129a7b4 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -612,6 +612,7 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
612struct GNUNET_MQ_Handle * 612struct GNUNET_MQ_Handle *
613GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, 613GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
614 const struct GNUNET_MQ_MessageHandler *handlers, 614 const struct GNUNET_MQ_MessageHandler *handlers,
615 GNUNET_MQ_ErrorHandler error_handler,
615 void *cls) 616 void *cls)
616{ 617{
617 struct GNUNET_MQ_Handle *mq; 618 struct GNUNET_MQ_Handle *mq;
@@ -621,6 +622,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
621 622
622 mq = GNUNET_new (struct GNUNET_MQ_Handle); 623 mq = GNUNET_new (struct GNUNET_MQ_Handle);
623 mq->handlers = handlers; 624 mq->handlers = handlers;
625 mq->error_handler = error_handler;
624 mq->handlers_cls = cls; 626 mq->handlers_cls = cls;
625 state = GNUNET_new (struct ClientConnectionState); 627 state = GNUNET_new (struct ClientConnectionState);
626 state->connection = connection; 628 state->connection = connection;
@@ -708,18 +710,29 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
708void 710void
709GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) 711GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
710{ 712{
711 /* FIXME: destroy all pending messages in the queue */
712
713 if (NULL != mq->destroy_impl) 713 if (NULL != mq->destroy_impl)
714 { 714 {
715 mq->destroy_impl (mq, mq->impl_state); 715 mq->destroy_impl (mq, mq->impl_state);
716 } 716 }
717 717
718 while (NULL != mq->envelope_head)
719 {
720 struct GNUNET_MQ_Envelope *ev;
721 ev = mq->envelope_head;
722 GNUNET_MQ_discard (ev);
723 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
724 }
725
726 if (NULL != mq->current_envelope)
727 {
728 GNUNET_MQ_discard (mq->current_envelope);
729 mq->current_envelope = NULL;
730 }
731
718 GNUNET_free (mq); 732 GNUNET_free (mq);
719} 733}
720 734
721 735
722
723struct GNUNET_MessageHeader * 736struct GNUNET_MessageHeader *
724GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) 737GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
725{ 738{