diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-07-10 01:31:13 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-07-10 01:31:13 +0000 |
commit | 6b8400966a5e6c2194785b3a33f91b748cfa7b7b (patch) | |
tree | 0dafa7ba24c7a6dbb852fdedfd1822cd1e4835c0 /src | |
parent | 084cb3e09007ef50a3d9bd29514c8ec455249633 (diff) | |
download | gnunet-6b8400966a5e6c2194785b3a33f91b748cfa7b7b.tar.gz gnunet-6b8400966a5e6c2194785b3a33f91b748cfa7b7b.zip |
- set service working
- set profiler
Diffstat (limited to 'src')
-rw-r--r-- | src/consensus/Makefile.am | 14 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 2 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-start-peers.c | 186 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 2 | ||||
-rw-r--r-- | src/include/gnunet_mq_lib.h | 2 | ||||
-rw-r--r-- | src/include/gnunet_set_service.h | 20 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 407 | ||||
-rw-r--r-- | src/set/gnunet-service-set.h | 342 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 431 | ||||
-rw-r--r-- | src/set/gnunet-set-profiler.c | 229 | ||||
-rw-r--r-- | src/set/set.h | 18 | ||||
-rw-r--r-- | src/set/set_api.c | 144 | ||||
-rw-r--r-- | src/util/mq.c | 19 |
13 files changed, 893 insertions, 923 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index 914fbdef8..4b81baa59 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am | |||
@@ -16,8 +16,7 @@ if USE_COVERAGE | |||
16 | endif | 16 | endif |
17 | 17 | ||
18 | bin_PROGRAMS = \ | 18 | bin_PROGRAMS = \ |
19 | gnunet-consensus \ | 19 | gnunet-consensus |
20 | gnunet-consensus-start-peers | ||
21 | 20 | ||
22 | libexec_PROGRAMS = \ | 21 | libexec_PROGRAMS = \ |
23 | gnunet-service-consensus | 22 | gnunet-service-consensus |
@@ -41,17 +40,6 @@ gnunet_consensus_LDADD = \ | |||
41 | gnunet_consensus_DEPENDENCIES = \ | 40 | gnunet_consensus_DEPENDENCIES = \ |
42 | libgnunetconsensus.la | 41 | libgnunetconsensus.la |
43 | 42 | ||
44 | gnunet_consensus_start_peers_SOURCES = \ | ||
45 | gnunet-consensus-start-peers.c | ||
46 | gnunet_consensus_start_peers_LDADD = \ | ||
47 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
48 | $(top_builddir)/src/testbed/libgnunettestbed.la \ | ||
49 | $(top_builddir)/src/consensus/libgnunetconsensus.la \ | ||
50 | $(GN_LIBINTL) | ||
51 | gnunet_consensus_start_peers_DEPENDENCIES = \ | ||
52 | libgnunetconsensus.la | ||
53 | |||
54 | |||
55 | gnunet_service_consensus_SOURCES = \ | 43 | gnunet_service_consensus_SOURCES = \ |
56 | gnunet-service-consensus.c | 44 | gnunet-service-consensus.c |
57 | gnunet_service_consensus_LDADD = \ | 45 | gnunet_service_consensus_LDADD = \ |
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index a1dc40826..7690dc059 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -205,7 +205,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
205 | 205 | ||
206 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); | 206 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); |
207 | consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, | 207 | consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, |
208 | mq_handlers, consensus); | 208 | mq_handlers, NULL, consensus); |
209 | 209 | ||
210 | GNUNET_assert (consensus->client != NULL); | 210 | GNUNET_assert (consensus->client != NULL); |
211 | 211 | ||
diff --git a/src/consensus/gnunet-consensus-start-peers.c b/src/consensus/gnunet-consensus-start-peers.c deleted file mode 100644 index 05ba05691..000000000 --- a/src/consensus/gnunet-consensus-start-peers.c +++ /dev/null | |||
@@ -1,186 +0,0 @@ | |||
1 | |||
2 | /* | ||
3 | This file is part of GNUnet | ||
4 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
5 | |||
6 | GNUnet is free software; you can redistribute it and/or modify | ||
7 | it under the terms of the GNU General Public License as published | ||
8 | by the Free Software Foundation; either version 2, or (at your | ||
9 | option) any later version. | ||
10 | |||
11 | GNUnet is distributed in the hope that it will be useful, but | ||
12 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
13 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
14 | General Public License for more details. | ||
15 | |||
16 | You should have received a copy of the GNU General Public License | ||
17 | along with GNUnet; see the file COPYING. If not, write to the | ||
18 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
19 | Boston, MA 02111-1307, USA. | ||
20 | */ | ||
21 | |||
22 | /** | ||
23 | * @file consensus/gnunet-consensus-start-peers.c | ||
24 | * @brief Starts peers with testebed on localhost, | ||
25 | * prints their configuration files and waits for ^C. | ||
26 | * @author Florian Dold | ||
27 | */ | ||
28 | #include "platform.h" | ||
29 | #include "gnunet_util_lib.h" | ||
30 | #include "gnunet_testbed_service.h" | ||
31 | |||
32 | |||
33 | static char *config_template_file; | ||
34 | static unsigned int num_peers_requested = 2; | ||
35 | static struct GNUNET_TESTBED_Peer **peers; | ||
36 | |||
37 | |||
38 | /** | ||
39 | * Callback to be called when the requested peer information is available | ||
40 | * | ||
41 | * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information() | ||
42 | * @param op the operation this callback corresponds to | ||
43 | * @param pinfo the result; will be NULL if the operation has failed | ||
44 | * @param emsg error message if the operation has failed; will be NULL if the | ||
45 | * operation is successfull | ||
46 | */ | ||
47 | static void | ||
48 | peer_info_cb (void *cb_cls, | ||
49 | struct GNUNET_TESTBED_Operation | ||
50 | *op, | ||
51 | const struct | ||
52 | GNUNET_TESTBED_PeerInformation | ||
53 | *pinfo, | ||
54 | const char *emsg) | ||
55 | { | ||
56 | GNUNET_assert (NULL == emsg); | ||
57 | if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY) | ||
58 | { | ||
59 | struct GNUNET_CRYPTO_HashAsciiEncoded enc; | ||
60 | GNUNET_CRYPTO_hash_to_enc (&pinfo->result.id->hashPubKey, &enc); | ||
61 | printf("peer %td identity:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) - &peers[0]); | ||
62 | printf("%s\n", (char *)&enc); | ||
63 | } | ||
64 | else if (pinfo->pit == GNUNET_TESTBED_PIT_CONFIGURATION) | ||
65 | { | ||
66 | char *tmpfilename; | ||
67 | if (NULL == (tmpfilename = GNUNET_DISK_mktemp ("gnunet-consensus"))) | ||
68 | { | ||
69 | GNUNET_break (0); | ||
70 | GNUNET_SCHEDULER_shutdown (); | ||
71 | return; | ||
72 | } | ||
73 | if (GNUNET_SYSERR == | ||
74 | GNUNET_CONFIGURATION_write (pinfo->result.cfg, | ||
75 | tmpfilename)) | ||
76 | { | ||
77 | GNUNET_break (0); | ||
78 | return; | ||
79 | } | ||
80 | printf("peer %td config file:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) - &peers[0]); | ||
81 | printf("%s\n", tmpfilename); | ||
82 | } | ||
83 | else | ||
84 | { | ||
85 | GNUNET_assert (0); | ||
86 | } | ||
87 | } | ||
88 | |||
89 | |||
90 | |||
91 | /** | ||
92 | * Signature of the event handler function called by the | ||
93 | * respective event controller. | ||
94 | * | ||
95 | * @param cls closure | ||
96 | * @param event information about the event | ||
97 | */ | ||
98 | static void | ||
99 | controller_cb(void *cls, | ||
100 | const struct GNUNET_TESTBED_EventInformation *event) | ||
101 | { | ||
102 | GNUNET_assert (0); | ||
103 | } | ||
104 | |||
105 | |||
106 | |||
107 | |||
108 | /** | ||
109 | * Signature of a main function for a testcase. | ||
110 | * | ||
111 | * @param cls closure | ||
112 | * @param num_peers number of peers in 'peers' | ||
113 | * @param started_peers handle to peers being run in the testbed. NULL upon | ||
114 | * timeout (see GNUNET_TESTBED_test_run()). | ||
115 | * @param links_succeeded the number of overlay link connection attempts that | ||
116 | * succeeded | ||
117 | * @param links_failed the number of overlay link connection attempts that | ||
118 | * failed | ||
119 | */ | ||
120 | static void | ||
121 | test_master (void *cls, | ||
122 | unsigned int num_peers, | ||
123 | struct GNUNET_TESTBED_Peer **started_peers, | ||
124 | unsigned int links_succeeded, | ||
125 | unsigned int links_failed) | ||
126 | { | ||
127 | int i; | ||
128 | |||
129 | printf("started %d peers\n", num_peers); | ||
130 | peers = started_peers; | ||
131 | |||
132 | for (i = 0; i < num_peers; i++) | ||
133 | { | ||
134 | GNUNET_TESTBED_peer_get_information (peers[i], | ||
135 | GNUNET_TESTBED_PIT_IDENTITY, | ||
136 | peer_info_cb, | ||
137 | &peers[i]); | ||
138 | GNUNET_TESTBED_peer_get_information (peers[i], | ||
139 | GNUNET_TESTBED_PIT_CONFIGURATION, | ||
140 | peer_info_cb, | ||
141 | &peers[i]); | ||
142 | } | ||
143 | } | ||
144 | |||
145 | |||
146 | static void | ||
147 | run (void *cls, char *const *args, const char *cfgfile, | ||
148 | const struct GNUNET_CONFIGURATION_Handle *config) | ||
149 | { | ||
150 | if (NULL == config_template_file) | ||
151 | { | ||
152 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no template file specified\n"); | ||
153 | return; | ||
154 | } | ||
155 | |||
156 | (void) GNUNET_TESTBED_test_run ("gnunet-consensus-start-peers", | ||
157 | config_template_file, | ||
158 | num_peers_requested, | ||
159 | 0, | ||
160 | controller_cb, | ||
161 | NULL, | ||
162 | test_master, | ||
163 | NULL); | ||
164 | } | ||
165 | |||
166 | |||
167 | int | ||
168 | main (int argc, char **argv) | ||
169 | { | ||
170 | static const struct GNUNET_GETOPT_CommandLineOption options[] = { | ||
171 | { 't', "config-template", "TEMPLATE", | ||
172 | gettext_noop ("start peers with the given template configuration"), | ||
173 | GNUNET_YES, &GNUNET_GETOPT_set_string, &config_template_file }, | ||
174 | { 'n', "num-peers", "NUM", | ||
175 | gettext_noop ("number of peers to start"), | ||
176 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers_requested }, | ||
177 | GNUNET_GETOPT_OPTION_END | ||
178 | }; | ||
179 | |||
180 | /* run without scheduler, as test_run already does this */ | ||
181 | GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-start-peers", | ||
182 | "help", | ||
183 | options, &run, NULL, GNUNET_YES); | ||
184 | return 0; | ||
185 | } | ||
186 | |||
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 9cde0e46b..1245dcca4 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -266,8 +266,6 @@ have_exp_subround_finished (const struct ConsensusSession *session) | |||
266 | } | 266 | } |
267 | 267 | ||
268 | 268 | ||
269 | |||
270 | |||
271 | /** | 269 | /** |
272 | * Destroy a session, free all resources associated with it. | 270 | * Destroy a session, free all resources associated with it. |
273 | * | 271 | * |
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h index 048ad39a0..a20588b4d 100644 --- a/src/include/gnunet_mq_lib.h +++ b/src/include/gnunet_mq_lib.h | |||
@@ -328,12 +328,14 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id); | |||
328 | * | 328 | * |
329 | * @param connection the client connection | 329 | * @param connection the client connection |
330 | * @param handlers handlers for receiving messages | 330 | * @param handlers handlers for receiving messages |
331 | * @param error_handler error handler | ||
331 | * @param cls closure for the handlers | 332 | * @param cls closure for the handlers |
332 | * @return the message queue | 333 | * @return the message queue |
333 | */ | 334 | */ |
334 | struct GNUNET_MQ_Handle * | 335 | struct GNUNET_MQ_Handle * |
335 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, | 336 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, |
336 | const struct GNUNET_MQ_MessageHandler *handlers, | 337 | const struct GNUNET_MQ_MessageHandler *handlers, |
338 | GNUNET_MQ_ErrorHandler error_handler, | ||
337 | void *cls); | 339 | void *cls); |
338 | 340 | ||
339 | 341 | ||
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index ffc7f1c5b..b434e4474 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h | |||
@@ -74,9 +74,16 @@ struct GNUNET_SET_OperationHandle; | |||
74 | enum GNUNET_SET_OperationType | 74 | enum GNUNET_SET_OperationType |
75 | { | 75 | { |
76 | /** | 76 | /** |
77 | * A purely local set that does not support any | ||
78 | * operation. | ||
79 | */ | ||
80 | GNUNET_SET_OPERATION_NONE, | ||
81 | |||
82 | /** | ||
77 | * Set intersection, only return elements that are in both sets. | 83 | * Set intersection, only return elements that are in both sets. |
78 | */ | 84 | */ |
79 | GNUNET_SET_OPERATION_INTERSECTION, | 85 | GNUNET_SET_OPERATION_INTERSECTION, |
86 | |||
80 | /** | 87 | /** |
81 | * Set union, return all elements that are in at least one of the sets. | 88 | * Set union, return all elements that are in at least one of the sets. |
82 | */ | 89 | */ |
@@ -116,6 +123,7 @@ enum GNUNET_SET_Status | |||
116 | GNUNET_SET_STATUS_DONE | 123 | GNUNET_SET_STATUS_DONE |
117 | }; | 124 | }; |
118 | 125 | ||
126 | |||
119 | /** | 127 | /** |
120 | * The way results are given to the client. | 128 | * The way results are given to the client. |
121 | */ | 129 | */ |
@@ -137,6 +145,7 @@ enum GNUNET_SET_ResultMode | |||
137 | GNUNET_SET_RESULT_REMOVED | 145 | GNUNET_SET_RESULT_REMOVED |
138 | }; | 146 | }; |
139 | 147 | ||
148 | |||
140 | /** | 149 | /** |
141 | * Element stored in a set. | 150 | * Element stored in a set. |
142 | */ | 151 | */ |
@@ -182,18 +191,19 @@ typedef void (*GNUNET_SET_ResultIterator) (void *cls, | |||
182 | 191 | ||
183 | /** | 192 | /** |
184 | * Called when another peer wants to do a set operation with the | 193 | * Called when another peer wants to do a set operation with the |
185 | * local peer. | 194 | * local peer. If a listen error occurs, the 'request' is NULL. |
186 | * | 195 | * |
187 | * @param cls closure | 196 | * @param cls closure |
188 | * @param other_peer the other peer | 197 | * @param other_peer the other peer |
189 | * @param context_msg message with application specific information from | 198 | * @param context_msg message with application specific information from |
190 | * the other peer | 199 | * the other peer |
191 | * @param request request from the other peer, use GNUNET_SET_accept | 200 | * @param request request from the other peer, use GNUNET_SET_accept |
201 | * Will be NULL if the listener failed. | ||
192 | * to accept it, otherwise the request will be refused | 202 | * to accept it, otherwise the request will be refused |
193 | * Note that we don't use a return value here, as it is also | 203 | * Note that we can't just return value from the listen callback, |
194 | * necessary to specify the set we want to do the operation with, | 204 | * as it is also necessary to specify the set we want to do the |
195 | * whith sometimes can be derived from the context message. | 205 | * operation with, whith sometimes can be derived from the context |
196 | * Also necessary to specify the timeout. | 206 | * message. It's necessary to specify the timeout. |
197 | */ | 207 | */ |
198 | typedef void | 208 | typedef void |
199 | (*GNUNET_SET_ListenCallback) (void *cls, | 209 | (*GNUNET_SET_ListenCallback) (void *cls, |
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 62d1ce21b..e5c42eca0 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -46,11 +46,14 @@ struct Incoming | |||
46 | 46 | ||
47 | /** | 47 | /** |
48 | * Detail information about the operation. | 48 | * Detail information about the operation. |
49 | * NULL as long as we did not receive the operation | ||
50 | * request from the remote peer. | ||
49 | */ | 51 | */ |
50 | struct OperationSpecification *spec; | 52 | struct OperationSpecification *spec; |
51 | 53 | ||
52 | /** | 54 | /** |
53 | * The identity of the requesting peer. | 55 | * The identity of the requesting peer. Needs to |
56 | * be stored here as the op spec might not have been created yet. | ||
54 | */ | 57 | */ |
55 | struct GNUNET_PeerIdentity peer; | 58 | struct GNUNET_PeerIdentity peer; |
56 | 59 | ||
@@ -83,17 +86,55 @@ struct Incoming | |||
83 | 86 | ||
84 | 87 | ||
85 | /** | 88 | /** |
89 | * A listener is inhabited by a client, and | ||
90 | * waits for evaluation requests from remote peers. | ||
91 | */ | ||
92 | struct Listener | ||
93 | { | ||
94 | /** | ||
95 | * Listeners are held in a doubly linked list. | ||
96 | */ | ||
97 | struct Listener *next; | ||
98 | |||
99 | /** | ||
100 | * Listeners are held in a doubly linked list. | ||
101 | */ | ||
102 | struct Listener *prev; | ||
103 | |||
104 | /** | ||
105 | * Client that owns the listener. | ||
106 | * Only one client may own a listener. | ||
107 | */ | ||
108 | struct GNUNET_SERVER_Client *client; | ||
109 | |||
110 | /** | ||
111 | * Message queue for the client | ||
112 | */ | ||
113 | struct GNUNET_MQ_Handle *client_mq; | ||
114 | |||
115 | /** | ||
116 | * The type of the operation. | ||
117 | */ | ||
118 | enum GNUNET_SET_OperationType operation; | ||
119 | |||
120 | /** | ||
121 | * Application ID for the operation, used to distinguish | ||
122 | * multiple operations of the same type with the same peer. | ||
123 | */ | ||
124 | struct GNUNET_HashCode app_id; | ||
125 | }; | ||
126 | |||
127 | |||
128 | /** | ||
86 | * Configuration of our local peer. | 129 | * Configuration of our local peer. |
87 | * (Not declared 'static' as also needed in gnunet-service-set_union.c) | ||
88 | */ | 130 | */ |
89 | const struct GNUNET_CONFIGURATION_Handle *configuration; | 131 | static const struct GNUNET_CONFIGURATION_Handle *configuration; |
90 | 132 | ||
91 | /** | 133 | /** |
92 | * Handle to the mesh service, used | 134 | * Handle to the mesh service, used |
93 | * to listen for and connect to remote peers. | 135 | * to listen for and connect to remote peers. |
94 | * (Not declared 'static' as also needed in gnunet-service-set_union.c) | ||
95 | */ | 136 | */ |
96 | struct GNUNET_MESH_Handle *mesh; | 137 | static struct GNUNET_MESH_Handle *mesh; |
97 | 138 | ||
98 | /** | 139 | /** |
99 | * Sets are held in a doubly linked list. | 140 | * Sets are held in a doubly linked list. |
@@ -204,8 +245,9 @@ listener_destroy (struct Listener *listener) | |||
204 | * The client's destroy callback will destroy the listener again. */ | 245 | * The client's destroy callback will destroy the listener again. */ |
205 | if (NULL != listener->client) | 246 | if (NULL != listener->client) |
206 | { | 247 | { |
207 | GNUNET_SERVER_client_disconnect (listener->client); | 248 | struct GNUNET_SERVER_Client *client = listener->client; |
208 | listener->client = NULL; | 249 | listener->client = NULL; |
250 | GNUNET_SERVER_client_disconnect (client); | ||
209 | return; | 251 | return; |
210 | } | 252 | } |
211 | if (NULL != listener->client_mq) | 253 | if (NULL != listener->client_mq) |
@@ -230,22 +272,19 @@ set_destroy (struct Set *set) | |||
230 | * The client's destroy callback will destroy the set again. */ | 272 | * The client's destroy callback will destroy the set again. */ |
231 | if (NULL != set->client) | 273 | if (NULL != set->client) |
232 | { | 274 | { |
233 | GNUNET_SERVER_client_disconnect (set->client); | 275 | struct GNUNET_SERVER_Client *client = set->client; |
234 | set->client = NULL; | 276 | set->client = NULL; |
277 | GNUNET_SERVER_client_disconnect (client); | ||
235 | return; | 278 | return; |
236 | } | 279 | } |
237 | switch (set->operation) | 280 | if (NULL != set->client_mq) |
238 | { | 281 | { |
239 | case GNUNET_SET_OPERATION_INTERSECTION: | 282 | GNUNET_MQ_destroy (set->client_mq); |
240 | GNUNET_assert (0); | 283 | set->client_mq = NULL; |
241 | break; | ||
242 | case GNUNET_SET_OPERATION_UNION: | ||
243 | _GSS_union_set_destroy (set); | ||
244 | break; | ||
245 | default: | ||
246 | GNUNET_assert (0); | ||
247 | break; | ||
248 | } | 284 | } |
285 | GNUNET_assert (NULL != set->state); | ||
286 | set->vt->destroy_set (set->state); | ||
287 | set->state = NULL; | ||
249 | GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set); | 288 | GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set); |
250 | GNUNET_free (set); | 289 | GNUNET_free (set); |
251 | } | 290 | } |
@@ -264,6 +303,8 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
264 | struct Set *set; | 303 | struct Set *set; |
265 | struct Listener *listener; | 304 | struct Listener *listener; |
266 | 305 | ||
306 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, cleaning up\n"); | ||
307 | |||
267 | set = set_get (client); | 308 | set = set_get (client); |
268 | if (NULL != set) | 309 | if (NULL != set) |
269 | { | 310 | { |
@@ -287,6 +328,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
287 | static void | 328 | static void |
288 | incoming_destroy (struct Incoming *incoming) | 329 | incoming_destroy (struct Incoming *incoming) |
289 | { | 330 | { |
331 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); | ||
290 | if (NULL != incoming->tunnel) | 332 | if (NULL != incoming->tunnel) |
291 | { | 333 | { |
292 | struct GNUNET_MESH_Tunnel *t = incoming->tunnel; | 334 | struct GNUNET_MESH_Tunnel *t = incoming->tunnel; |
@@ -294,7 +336,6 @@ incoming_destroy (struct Incoming *incoming) | |||
294 | GNUNET_MESH_tunnel_destroy (t); | 336 | GNUNET_MESH_tunnel_destroy (t); |
295 | return; | 337 | return; |
296 | } | 338 | } |
297 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); | ||
298 | GNUNET_free (incoming); | 339 | GNUNET_free (incoming); |
299 | } | 340 | } |
300 | 341 | ||
@@ -338,7 +379,7 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener) | |||
338 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, | 379 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, |
339 | incoming->spec->context_msg); | 380 | incoming->spec->context_msg); |
340 | GNUNET_assert (NULL != mqm); | 381 | GNUNET_assert (NULL != mqm); |
341 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id %u\n", incoming->suggest_id); | 382 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id %u\n", incoming->suggest_id); |
342 | cmsg->accept_id = htonl (incoming->suggest_id); | 383 | cmsg->accept_id = htonl (incoming->suggest_id); |
343 | cmsg->peer_id = incoming->spec->peer; | 384 | cmsg->peer_id = incoming->spec->peer; |
344 | GNUNET_MQ_send (listener->client_mq, mqm); | 385 | GNUNET_MQ_send (listener->client_mq, mqm); |
@@ -350,33 +391,28 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener) | |||
350 | * Handle a request for a set operation from | 391 | * Handle a request for a set operation from |
351 | * another peer. | 392 | * another peer. |
352 | * | 393 | * |
353 | * @param cls the incoming socket | 394 | * @param op the operation state |
354 | * @param tunnel the tunnel that sent the message | 395 | * @param mh the received message |
355 | * @param tunnel_ctx the tunnel context | 396 | * @return GNUNET_OK if the tunnel should be kept alive, |
356 | * @param mh the message | 397 | * GNUNET_SYSERR to destroy the tunnel |
357 | */ | 398 | */ |
358 | static int | 399 | static int |
359 | handle_p2p_operation_request (void *cls, | 400 | handle_incoming_msg (struct OperationState *op, |
360 | struct GNUNET_MESH_Tunnel *tunnel, | 401 | const struct GNUNET_MessageHeader *mh) |
361 | void **tunnel_ctx, | ||
362 | const struct GNUNET_MessageHeader *mh) | ||
363 | { | 402 | { |
364 | struct TunnelContext *tc = *tunnel_ctx; | 403 | struct Incoming *incoming = (struct Incoming *) op; |
365 | struct Incoming *incoming; | ||
366 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; | 404 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; |
367 | struct Listener *listener; | 405 | struct Listener *listener; |
368 | struct OperationSpecification *spec; | 406 | struct OperationSpecification *spec; |
369 | 407 | ||
370 | if (CONTEXT_INCOMING != tc->type) | 408 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got op request\n"); |
409 | |||
410 | if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type)) | ||
371 | { | 411 | { |
372 | /* unexpected request */ | ||
373 | GNUNET_break_op (0); | 412 | GNUNET_break_op (0); |
374 | /* kill the tunnel, cleaner will be called */ | ||
375 | return GNUNET_SYSERR; | 413 | return GNUNET_SYSERR; |
376 | } | 414 | } |
377 | 415 | ||
378 | incoming = tc->data.incoming; | ||
379 | |||
380 | if (NULL != incoming->spec) | 416 | if (NULL != incoming->spec) |
381 | { | 417 | { |
382 | /* double operation request */ | 418 | /* double operation request */ |
@@ -385,8 +421,9 @@ handle_p2p_operation_request (void *cls, | |||
385 | } | 421 | } |
386 | 422 | ||
387 | spec = GNUNET_new (struct OperationSpecification); | 423 | spec = GNUNET_new (struct OperationSpecification); |
388 | spec->context_msg = | 424 | spec->context_msg = GNUNET_MQ_extract_nested_mh (msg); |
389 | GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg)); | 425 | if (NULL != spec->context_msg) |
426 | spec->context_msg = GNUNET_copy_message (spec->context_msg); | ||
390 | spec->operation = ntohl (msg->operation); | 427 | spec->operation = ntohl (msg->operation); |
391 | spec->app_id = msg->app_id; | 428 | spec->app_id = msg->app_id; |
392 | spec->salt = ntohl (msg->salt); | 429 | spec->salt = ntohl (msg->salt); |
@@ -401,12 +438,12 @@ handle_p2p_operation_request (void *cls, | |||
401 | return GNUNET_SYSERR; | 438 | return GNUNET_SYSERR; |
402 | } | 439 | } |
403 | 440 | ||
404 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", | 441 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, app %s)\n", |
405 | ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); | 442 | ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); |
406 | listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id); | 443 | listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id); |
407 | if (NULL == listener) | 444 | if (NULL == listener) |
408 | { | 445 | { |
409 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 446 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
410 | "no listener matches incoming request, waiting with timeout\n"); | 447 | "no listener matches incoming request, waiting with timeout\n"); |
411 | return GNUNET_OK; | 448 | return GNUNET_OK; |
412 | } | 449 | } |
@@ -430,7 +467,7 @@ handle_client_create (void *cls, | |||
430 | struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m; | 467 | struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m; |
431 | struct Set *set; | 468 | struct Set *set; |
432 | 469 | ||
433 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", | 470 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client created new set (operation %u)\n", |
434 | ntohs (msg->operation)); | 471 | ntohs (msg->operation)); |
435 | 472 | ||
436 | if (NULL != set_get (client)) | 473 | if (NULL != set_get (client)) |
@@ -441,14 +478,15 @@ handle_client_create (void *cls, | |||
441 | } | 478 | } |
442 | 479 | ||
443 | set = NULL; | 480 | set = NULL; |
481 | set = GNUNET_new (struct Set); | ||
444 | 482 | ||
445 | switch (ntohs (msg->operation)) | 483 | switch (ntohs (msg->operation)) |
446 | { | 484 | { |
447 | case GNUNET_SET_OPERATION_INTERSECTION: | 485 | case GNUNET_SET_OPERATION_INTERSECTION: |
448 | //set = _GSS_intersection_set_create (); | 486 | // FIXME |
449 | break; | 487 | break; |
450 | case GNUNET_SET_OPERATION_UNION: | 488 | case GNUNET_SET_OPERATION_UNION: |
451 | set = _GSS_union_set_create (); | 489 | set->vt = _GSS_union_vt (); |
452 | break; | 490 | break; |
453 | default: | 491 | default: |
454 | GNUNET_break (0); | 492 | GNUNET_break (0); |
@@ -456,8 +494,7 @@ handle_client_create (void *cls, | |||
456 | return; | 494 | return; |
457 | } | 495 | } |
458 | 496 | ||
459 | GNUNET_assert (NULL != set); | 497 | set->state = set->vt->create (); |
460 | |||
461 | set->client = client; | 498 | set->client = client; |
462 | set->client_mq = GNUNET_MQ_queue_for_server_client (client); | 499 | set->client_mq = GNUNET_MQ_queue_for_server_client (client); |
463 | GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); | 500 | GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); |
@@ -493,7 +530,7 @@ handle_client_listen (void *cls, | |||
493 | listener->app_id = msg->app_id; | 530 | listener->app_id = msg->app_id; |
494 | listener->operation = ntohs (msg->operation); | 531 | listener->operation = ntohs (msg->operation); |
495 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); | 532 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); |
496 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new listener created (op %u, app %s)\n", | 533 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n", |
497 | listener->operation, GNUNET_h2s (&listener->app_id)); | 534 | listener->operation, GNUNET_h2s (&listener->app_id)); |
498 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) | 535 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) |
499 | { | 536 | { |
@@ -511,46 +548,6 @@ handle_client_listen (void *cls, | |||
511 | 548 | ||
512 | 549 | ||
513 | /** | 550 | /** |
514 | * Called when a client wants to remove an element | ||
515 | * from the set it inhabits. | ||
516 | * | ||
517 | * @param cls unused | ||
518 | * @param client client that sent the message | ||
519 | * @param m message sent by the client | ||
520 | */ | ||
521 | static void | ||
522 | handle_client_remove (void *cls, | ||
523 | struct GNUNET_SERVER_Client *client, | ||
524 | const struct GNUNET_MessageHeader *m) | ||
525 | { | ||
526 | struct Set *set; | ||
527 | |||
528 | set = set_get (client); | ||
529 | if (NULL == set) | ||
530 | { | ||
531 | GNUNET_break (0); | ||
532 | GNUNET_SERVER_client_disconnect (client); | ||
533 | return; | ||
534 | } | ||
535 | switch (set->operation) | ||
536 | { | ||
537 | case GNUNET_SET_OPERATION_UNION: | ||
538 | _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set); | ||
539 | break; | ||
540 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
541 | //_GSS_intersection_remove ((struct GNUNET_SET_ElementMessage *) m, set); | ||
542 | break; | ||
543 | default: | ||
544 | GNUNET_assert (0); | ||
545 | break; | ||
546 | } | ||
547 | |||
548 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
549 | } | ||
550 | |||
551 | |||
552 | |||
553 | /** | ||
554 | * Called when the client wants to reject an operation | 551 | * Called when the client wants to reject an operation |
555 | * request from another peer. | 552 | * request from another peer. |
556 | * | 553 | * |
@@ -574,13 +571,12 @@ handle_client_reject (void *cls, | |||
574 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 571 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
575 | return; | 572 | return; |
576 | } | 573 | } |
577 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); | 574 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n"); |
578 | GNUNET_MESH_tunnel_destroy (incoming->tunnel); | 575 | GNUNET_MESH_tunnel_destroy (incoming->tunnel); |
579 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 576 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
580 | } | 577 | } |
581 | 578 | ||
582 | 579 | ||
583 | |||
584 | /** | 580 | /** |
585 | * Called when a client wants to add an element to a | 581 | * Called when a client wants to add an element to a |
586 | * set it inhabits. | 582 | * set it inhabits. |
@@ -590,11 +586,13 @@ handle_client_reject (void *cls, | |||
590 | * @param m message sent by the client | 586 | * @param m message sent by the client |
591 | */ | 587 | */ |
592 | static void | 588 | static void |
593 | handle_client_add (void *cls, | 589 | handle_client_add_remove (void *cls, |
594 | struct GNUNET_SERVER_Client *client, | 590 | struct GNUNET_SERVER_Client *client, |
595 | const struct GNUNET_MessageHeader *m) | 591 | const struct GNUNET_MessageHeader *m) |
596 | { | 592 | { |
597 | struct Set *set; | 593 | struct Set *set; |
594 | const struct GNUNET_SET_ElementMessage *msg; | ||
595 | struct GNUNET_SET_Element el; | ||
598 | 596 | ||
599 | set = set_get (client); | 597 | set = set_get (client); |
600 | if (NULL == set) | 598 | if (NULL == set) |
@@ -603,19 +601,14 @@ handle_client_add (void *cls, | |||
603 | GNUNET_SERVER_client_disconnect (client); | 601 | GNUNET_SERVER_client_disconnect (client); |
604 | return; | 602 | return; |
605 | } | 603 | } |
606 | switch (set->operation) | 604 | msg = (const struct GNUNET_SET_ElementMessage *) m; |
607 | { | 605 | el.size = ntohs (m->size) - sizeof *msg; |
608 | case GNUNET_SET_OPERATION_UNION: | 606 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client ins/rem element of size %u\n", el.size); |
609 | _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set); | 607 | el.data = &msg[1]; |
610 | break; | 608 | if (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type)) |
611 | case GNUNET_SET_OPERATION_INTERSECTION: | 609 | set->vt->remove (set->state, &el); |
612 | //_GSS_intersection_add ((struct GNUNET_SET_ElementMessage *) m, set); | 610 | else |
613 | break; | 611 | set->vt->add (set->state, &el); |
614 | default: | ||
615 | GNUNET_assert (0); | ||
616 | break; | ||
617 | } | ||
618 | |||
619 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 612 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
620 | } | 613 | } |
621 | 614 | ||
@@ -653,27 +646,15 @@ handle_client_evaluate (void *cls, | |||
653 | spec->app_id = msg->app_id; | 646 | spec->app_id = msg->app_id; |
654 | spec->salt = ntohl (msg->salt); | 647 | spec->salt = ntohl (msg->salt); |
655 | spec->peer = msg->target_peer; | 648 | spec->peer = msg->target_peer; |
649 | spec->set = set; | ||
650 | spec->client_request_id = ntohl (msg->request_id); | ||
656 | 651 | ||
657 | tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer, | 652 | tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer, |
658 | GNUNET_APPLICATION_TYPE_SET, | 653 | GNUNET_APPLICATION_TYPE_SET, |
659 | GNUNET_YES, | 654 | GNUNET_YES, |
660 | GNUNET_YES); | 655 | GNUNET_YES); |
661 | 656 | ||
662 | switch (set->operation) | 657 | set->vt->evaluate (spec, tunnel, tc); |
663 | { | ||
664 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
665 | tc->type = CONTEXT_OPERATION_INTERSECTION; | ||
666 | //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); | ||
667 | break; | ||
668 | case GNUNET_SET_OPERATION_UNION: | ||
669 | tc->type = CONTEXT_OPERATION_UNION; | ||
670 | tc->data.union_op = | ||
671 | _GSS_union_evaluate (spec, tunnel); | ||
672 | break; | ||
673 | default: | ||
674 | GNUNET_assert (0); | ||
675 | break; | ||
676 | } | ||
677 | 658 | ||
678 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 659 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
679 | } | 660 | } |
@@ -705,6 +686,35 @@ handle_client_ack (void *cls, | |||
705 | * @param mh the message | 686 | * @param mh the message |
706 | */ | 687 | */ |
707 | static void | 688 | static void |
689 | handle_client_cancel (void *cls, | ||
690 | struct GNUNET_SERVER_Client *client, | ||
691 | const struct GNUNET_MessageHeader *mh) | ||
692 | { | ||
693 | const struct GNUNET_SET_CancelMessage *msg = | ||
694 | (const struct GNUNET_SET_CancelMessage *) mh; | ||
695 | struct Set *set; | ||
696 | |||
697 | set = set_get (client); | ||
698 | if (NULL == set) | ||
699 | { | ||
700 | GNUNET_break (0); | ||
701 | GNUNET_SERVER_client_disconnect (client); | ||
702 | return; | ||
703 | } | ||
704 | /* FIXME: maybe cancel should return success/error code? */ | ||
705 | set->vt->cancel (set->state, ntohl (msg->request_id)); | ||
706 | } | ||
707 | |||
708 | |||
709 | /** | ||
710 | * Handle a request from the client to accept | ||
711 | * a set operation that came from a remote peer. | ||
712 | * | ||
713 | * @param cls unused | ||
714 | * @param client the client | ||
715 | * @param mh the message | ||
716 | */ | ||
717 | static void | ||
708 | handle_client_accept (void *cls, | 718 | handle_client_accept (void *cls, |
709 | struct GNUNET_SERVER_Client *client, | 719 | struct GNUNET_SERVER_Client *client, |
710 | const struct GNUNET_MessageHeader *mh) | 720 | const struct GNUNET_MessageHeader *mh) |
@@ -712,12 +722,10 @@ handle_client_accept (void *cls, | |||
712 | struct Set *set; | 722 | struct Set *set; |
713 | struct Incoming *incoming; | 723 | struct Incoming *incoming; |
714 | struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; | 724 | struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; |
715 | struct OperationSpecification *spec; | ||
716 | struct TunnelContext *tc; | ||
717 | 725 | ||
718 | incoming = get_incoming (ntohl (msg->accept_reject_id)); | 726 | incoming = get_incoming (ntohl (msg->accept_reject_id)); |
719 | 727 | ||
720 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl (msg->accept_reject_id)); | 728 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl (msg->accept_reject_id)); |
721 | 729 | ||
722 | if (NULL == incoming) | 730 | if (NULL == incoming) |
723 | { | 731 | { |
@@ -736,24 +744,9 @@ handle_client_accept (void *cls, | |||
736 | return; | 744 | return; |
737 | } | 745 | } |
738 | 746 | ||
739 | spec = GNUNET_new (struct OperationSpecification); | 747 | incoming->spec->set = set; |
740 | tc = incoming->tc; | 748 | incoming->spec->client_request_id = ntohl (msg->request_id); |
741 | 749 | set->vt->accept (incoming->spec, incoming->tunnel, incoming->tc); | |
742 | switch (set->operation) | ||
743 | { | ||
744 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
745 | tc->type = CONTEXT_OPERATION_INTERSECTION; | ||
746 | // _GSS_intersection_accept (msg, set, incoming); | ||
747 | break; | ||
748 | case GNUNET_SET_OPERATION_UNION: | ||
749 | tc->type = CONTEXT_OPERATION_UNION; | ||
750 | tc->data.union_op = _GSS_union_accept (spec, incoming->tunnel); | ||
751 | break; | ||
752 | default: | ||
753 | GNUNET_assert (0); | ||
754 | break; | ||
755 | } | ||
756 | |||
757 | /* tunnel ownership goes to operation */ | 750 | /* tunnel ownership goes to operation */ |
758 | incoming->tunnel = NULL; | 751 | incoming->tunnel = NULL; |
759 | incoming_destroy (incoming); | 752 | incoming_destroy (incoming); |
@@ -771,12 +764,6 @@ static void | |||
771 | shutdown_task (void *cls, | 764 | shutdown_task (void *cls, |
772 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 765 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
773 | { | 766 | { |
774 | if (NULL != mesh) | ||
775 | { | ||
776 | GNUNET_MESH_disconnect (mesh); | ||
777 | mesh = NULL; | ||
778 | } | ||
779 | |||
780 | while (NULL != incoming_head) | 767 | while (NULL != incoming_head) |
781 | incoming_destroy (incoming_head); | 768 | incoming_destroy (incoming_head); |
782 | 769 | ||
@@ -786,9 +773,17 @@ shutdown_task (void *cls, | |||
786 | while (NULL != sets_head) | 773 | while (NULL != sets_head) |
787 | set_destroy (sets_head); | 774 | set_destroy (sets_head); |
788 | 775 | ||
789 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); | ||
790 | } | ||
791 | 776 | ||
777 | /* it's important to destroy mesh at the end, as tunnels | ||
778 | * must be destroyed first! */ | ||
779 | if (NULL != mesh) | ||
780 | { | ||
781 | GNUNET_MESH_disconnect (mesh); | ||
782 | mesh = NULL; | ||
783 | } | ||
784 | |||
785 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); | ||
786 | } | ||
792 | 787 | ||
793 | 788 | ||
794 | /** | 789 | /** |
@@ -803,7 +798,18 @@ incoming_timeout_cb (void *cls, | |||
803 | { | 798 | { |
804 | struct Incoming *incoming = cls; | 799 | struct Incoming *incoming = cls; |
805 | 800 | ||
806 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out\n"); | 801 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "remote peer timed out\n"); |
802 | incoming_destroy (incoming); | ||
803 | } | ||
804 | |||
805 | |||
806 | static void | ||
807 | handle_incoming_disconnect (struct OperationState *op_state) | ||
808 | { | ||
809 | struct Incoming *incoming = (struct Incoming *) op_state; | ||
810 | if (NULL == incoming->tunnel) | ||
811 | return; | ||
812 | |||
807 | incoming_destroy (incoming); | 813 | incoming_destroy (incoming); |
808 | } | 814 | } |
809 | 815 | ||
@@ -830,23 +836,25 @@ tunnel_new_cb (void *cls, | |||
830 | uint32_t port) | 836 | uint32_t port) |
831 | { | 837 | { |
832 | struct Incoming *incoming; | 838 | struct Incoming *incoming; |
833 | struct TunnelContext *tc; | 839 | static const struct SetVT incoming_vt = { |
840 | .msg_handler = handle_incoming_msg, | ||
841 | .peer_disconnect = handle_incoming_disconnect | ||
842 | }; | ||
834 | 843 | ||
835 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new incoming tunnel\n"); | 844 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n"); |
836 | 845 | ||
837 | GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); | 846 | GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); |
838 | tc = GNUNET_new (struct TunnelContext); | ||
839 | incoming = GNUNET_new (struct Incoming); | 847 | incoming = GNUNET_new (struct Incoming); |
840 | incoming->peer = *initiator; | 848 | incoming->peer = *initiator; |
841 | incoming->tunnel = tunnel; | 849 | incoming->tunnel = tunnel; |
842 | incoming->tc = tc; | 850 | incoming->tc = GNUNET_new (struct TunnelContext);; |
843 | tc->data.incoming = incoming; | 851 | incoming->tc->vt = &incoming_vt; |
844 | tc->type = CONTEXT_INCOMING; | 852 | incoming->tc->op = (struct OperationState *) incoming; |
845 | incoming->timeout_task = | 853 | incoming->timeout_task = |
846 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); | 854 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); |
847 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); | 855 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); |
848 | 856 | ||
849 | return tc; | 857 | return incoming->tc; |
850 | } | 858 | } |
851 | 859 | ||
852 | 860 | ||
@@ -867,27 +875,46 @@ tunnel_end_cb (void *cls, | |||
867 | { | 875 | { |
868 | struct TunnelContext *ctx = tunnel_ctx; | 876 | struct TunnelContext *ctx = tunnel_ctx; |
869 | 877 | ||
870 | switch (ctx->type) | 878 | ctx->vt->peer_disconnect (ctx->op); |
871 | { | 879 | /* mesh will never call us with the context again! */ |
872 | case CONTEXT_INCOMING: | ||
873 | incoming_destroy (ctx->data.incoming); | ||
874 | break; | ||
875 | case CONTEXT_OPERATION_UNION: | ||
876 | _GSS_union_operation_destroy (ctx->data.union_op); | ||
877 | break; | ||
878 | case CONTEXT_OPERATION_INTERSECTION: | ||
879 | GNUNET_assert (0); | ||
880 | /* FIXME: cfuchs */ | ||
881 | break; | ||
882 | default: | ||
883 | GNUNET_assert (0); | ||
884 | } | ||
885 | |||
886 | GNUNET_free (tunnel_ctx); | 880 | GNUNET_free (tunnel_ctx); |
887 | } | 881 | } |
888 | 882 | ||
889 | 883 | ||
890 | /** | 884 | /** |
885 | * Functions with this signature are called whenever a message is | ||
886 | * received. | ||
887 | * | ||
888 | * Each time the function must call GNUNET_MESH_receive_done on the tunnel | ||
889 | * in order to receive the next message. This doesn't need to be immediate: | ||
890 | * can be delayed if some processing is done on the message. | ||
891 | * | ||
892 | * @param cls Closure (set from GNUNET_MESH_connect). | ||
893 | * @param tunnel Connection to the other end. | ||
894 | * @param tunnel_ctx Place to store local state associated with the tunnel. | ||
895 | * @param message The actual message. | ||
896 | * | ||
897 | * @return GNUNET_OK to keep the tunnel open, | ||
898 | * GNUNET_SYSERR to close it (signal serious error). | ||
899 | */ | ||
900 | static int | ||
901 | dispatch_p2p_message (void *cls, | ||
902 | struct GNUNET_MESH_Tunnel *tunnel, | ||
903 | void **tunnel_ctx, | ||
904 | const struct GNUNET_MessageHeader *message) | ||
905 | { | ||
906 | struct TunnelContext *tc = *tunnel_ctx; | ||
907 | int ret; | ||
908 | |||
909 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message\n"); | ||
910 | ret = tc->vt->msg_handler (tc->op, message); | ||
911 | GNUNET_MESH_receive_done (tunnel); | ||
912 | |||
913 | return ret; | ||
914 | } | ||
915 | |||
916 | |||
917 | /** | ||
891 | * Function called by the service's run | 918 | * Function called by the service's run |
892 | * method to run service-specific setup code. | 919 | * method to run service-specific setup code. |
893 | * | 920 | * |
@@ -900,31 +927,29 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
900 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 927 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
901 | { | 928 | { |
902 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | 929 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { |
903 | {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0}, | 930 | {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, |
931 | sizeof (struct GNUNET_SET_AcceptRejectMessage)}, | ||
904 | {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0}, | 932 | {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0}, |
905 | {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, | 933 | {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, |
906 | {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, | 934 | {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, |
935 | sizeof (struct GNUNET_SET_CreateMessage)}, | ||
907 | {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, | 936 | {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, |
908 | {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0}, | 937 | {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, |
909 | {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0}, | 938 | sizeof (struct GNUNET_SET_ListenMessage)}, |
910 | {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, | 939 | {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, |
940 | sizeof (struct GNUNET_SET_AcceptRejectMessage)}, | ||
941 | {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, | ||
942 | {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, | ||
943 | sizeof (struct GNUNET_SET_CancelMessage)}, | ||
911 | {NULL, NULL, 0, 0} | 944 | {NULL, NULL, 0, 0} |
912 | }; | 945 | }; |
913 | static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { | 946 | static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { |
914 | {handle_p2p_operation_request, | 947 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, |
915 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, | 948 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0}, |
916 | /* messages for the union operation */ | 949 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, |
917 | {_GSS_union_handle_p2p_message, | 950 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, |
918 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0}, | 951 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, |
919 | {_GSS_union_handle_p2p_message, | 952 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0}, |
920 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, | ||
921 | {_GSS_union_handle_p2p_message, | ||
922 | GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, | ||
923 | {_GSS_union_handle_p2p_message, | ||
924 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, | ||
925 | {_GSS_union_handle_p2p_message, | ||
926 | GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0}, | ||
927 | /* FIXME: messages for intersection operation */ | ||
928 | {NULL, 0, 0} | 953 | {NULL, 0, 0} |
929 | }; | 954 | }; |
930 | static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; | 955 | static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; |
@@ -943,7 +968,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
943 | return; | 968 | return; |
944 | } | 969 | } |
945 | 970 | ||
946 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n"); | 971 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "started\n"); |
947 | } | 972 | } |
948 | 973 | ||
949 | 974 | ||
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 74cbf584e..6ababe92f 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h | |||
@@ -38,67 +38,25 @@ | |||
38 | #include "set.h" | 38 | #include "set.h" |
39 | 39 | ||
40 | 40 | ||
41 | /* FIXME: cfuchs */ | ||
42 | struct IntersectionState; | ||
43 | |||
44 | |||
45 | /* FIXME: cfuchs */ | ||
46 | struct IntersectionOperation; | ||
47 | |||
48 | |||
49 | /** | ||
50 | * Extra state required for set union. | ||
51 | */ | ||
52 | struct UnionState; | ||
53 | |||
54 | /** | 41 | /** |
55 | * State of a union operation being evaluated. | 42 | * Implementation-specific set state. |
43 | * Used as opaque pointer, and specified further | ||
44 | * in the respective implementation. | ||
56 | */ | 45 | */ |
57 | struct UnionEvaluateOperation; | 46 | struct SetState; |
58 | |||
59 | 47 | ||
60 | 48 | ||
61 | /** | 49 | /** |
62 | * A set that supports a specific operation | 50 | * Implementation-specific set operation. |
63 | * with other peers. | 51 | * Used as opaque pointer, and specified further |
52 | * in the respective implementation. | ||
64 | */ | 53 | */ |
65 | struct Set | 54 | struct OperationState; |
66 | { | ||
67 | /** | ||
68 | * Client that owns the set. | ||
69 | * Only one client may own a set. | ||
70 | */ | ||
71 | struct GNUNET_SERVER_Client *client; | ||
72 | |||
73 | /** | ||
74 | * Message queue for the client | ||
75 | */ | ||
76 | struct GNUNET_MQ_Handle *client_mq; | ||
77 | 55 | ||
78 | /** | ||
79 | * Type of operation supported for this set | ||
80 | */ | ||
81 | uint32_t operation; // use enum from API | ||
82 | 56 | ||
83 | /** | 57 | /* forward declarations */ |
84 | * Sets are held in a doubly linked list. | 58 | struct Set; |
85 | */ | 59 | struct TunnelContext; |
86 | struct Set *next; | ||
87 | |||
88 | /** | ||
89 | * Sets are held in a doubly linked list. | ||
90 | */ | ||
91 | struct Set *prev; | ||
92 | |||
93 | /** | ||
94 | * Appropriate state for each type of | ||
95 | * operation. | ||
96 | */ | ||
97 | union { | ||
98 | struct IntersectionState *i; | ||
99 | struct UnionState *u; | ||
100 | } state; | ||
101 | }; | ||
102 | 60 | ||
103 | 61 | ||
104 | /** | 62 | /** |
@@ -146,96 +104,169 @@ struct OperationSpecification | |||
146 | 104 | ||
147 | 105 | ||
148 | /** | 106 | /** |
149 | * A listener is inhabited by a client, and | 107 | * Signature of functions that create the implementation-specific |
150 | * waits for evaluation requests from remote peers. | 108 | * state for a set supporting a specific operation. |
109 | * | ||
110 | * @return a set state specific to the supported operation | ||
111 | */ | ||
112 | typedef struct SetState *(*CreateImpl) (void); | ||
113 | |||
114 | |||
115 | /** | ||
116 | * Signature of functions that implement the add/remove functionality | ||
117 | * for a set supporting a specific operation. | ||
118 | * | ||
119 | * @param set implementation-specific set state | ||
120 | * @param msg element message from the client | ||
121 | */ | ||
122 | typedef void (*AddRemoveImpl) (struct SetState *state, const struct GNUNET_SET_Element *element); | ||
123 | |||
124 | |||
125 | /** | ||
126 | * Signature of functions that handle disconnection | ||
127 | * of the remote peer. | ||
128 | * | ||
129 | * @param op the set operation, contains implementation-specific data | ||
130 | */ | ||
131 | typedef void (*PeerDisconnectImpl) (struct OperationState *op); | ||
132 | |||
133 | |||
134 | /** | ||
135 | * Signature of functions that implement the destruction of the | ||
136 | * implementation-specific set state. | ||
137 | * | ||
138 | * @param state the set state, contains implementation-specific data | ||
139 | */ | ||
140 | typedef void (*DestroySetImpl) (struct SetState *state); | ||
141 | |||
142 | |||
143 | /** | ||
144 | * Signature of functions that implement the creation of set operations | ||
145 | * (currently evaluate and accept). | ||
146 | * | ||
147 | * @param spec specification of the set operation to be created | ||
148 | * @param tunnel the tunnel with the other peer | ||
149 | * @param tc tunnel context | ||
150 | */ | ||
151 | typedef void (*OpCreateImpl) (struct OperationSpecification *spec, | ||
152 | struct GNUNET_MESH_Tunnel *tunnel, | ||
153 | struct TunnelContext *tc); | ||
154 | |||
155 | |||
156 | /** | ||
157 | * Signature of functions that implement the message handling for | ||
158 | * the different set operations. | ||
159 | * | ||
160 | * @param op operation state | ||
161 | * @param msg received message | ||
162 | * @return GNUNET_OK on success, GNUNET_SYSERR to | ||
163 | * destroy the operation and the tunnel | ||
151 | */ | 164 | */ |
152 | struct Listener | 165 | typedef int (*MsgHandlerImpl) (struct OperationState *op, |
166 | const struct GNUNET_MessageHeader *msg); | ||
167 | |||
168 | typedef void (*CancelImpl) (struct SetState *set, | ||
169 | uint32_t request_id); | ||
170 | |||
171 | |||
172 | /** | ||
173 | * Dispatch table for a specific set operation. | ||
174 | * Every set operation has to implement the callback | ||
175 | * in this struct. | ||
176 | */ | ||
177 | struct SetVT | ||
153 | { | 178 | { |
154 | /** | 179 | /** |
155 | * Listeners are held in a doubly linked list. | 180 | * Callback for the set creation. |
156 | */ | 181 | */ |
157 | struct Listener *next; | 182 | CreateImpl create; |
158 | 183 | ||
159 | /** | 184 | /** |
160 | * Listeners are held in a doubly linked list. | 185 | * Callback for element insertion |
161 | */ | 186 | */ |
162 | struct Listener *prev; | 187 | AddRemoveImpl add; |
163 | 188 | ||
164 | /** | 189 | /** |
165 | * Client that owns the listener. | 190 | * Callback for element removal. |
166 | * Only one client may own a listener. | ||
167 | */ | 191 | */ |
168 | struct GNUNET_SERVER_Client *client; | 192 | AddRemoveImpl remove; |
169 | 193 | ||
170 | /** | 194 | /** |
171 | * Message queue for the client | 195 | * Callback for accepting a set operation request |
172 | */ | 196 | */ |
173 | struct GNUNET_MQ_Handle *client_mq; | 197 | OpCreateImpl accept; |
174 | 198 | ||
175 | /** | 199 | /** |
176 | * The type of the operation. | 200 | * Callback for starting evaluation with a remote peer. |
177 | */ | 201 | */ |
178 | enum GNUNET_SET_OperationType operation; | 202 | OpCreateImpl evaluate; |
179 | 203 | ||
180 | /** | 204 | /** |
181 | * Application ID for the operation, used to distinguish | 205 | * Callback for destruction of the set state. |
182 | * multiple operations of the same type with the same peer. | ||
183 | */ | 206 | */ |
184 | struct GNUNET_HashCode app_id; | 207 | DestroySetImpl destroy_set; |
185 | }; | ||
186 | 208 | ||
209 | /** | ||
210 | * Callback for handling operation-specific messages. | ||
211 | */ | ||
212 | MsgHandlerImpl msg_handler; | ||
187 | 213 | ||
188 | /** | 214 | /** |
189 | * Peer that has connected to us, but is not yet evaluating a set operation. | 215 | * Callback for handling the remote peer's |
190 | * Once the peer has sent a request, and the client has | 216 | * disconnect. |
191 | * accepted or rejected it, this information will be deleted. | 217 | */ |
192 | */ | 218 | PeerDisconnectImpl peer_disconnect; |
193 | struct Incoming; | 219 | |
220 | /** | ||
221 | * Callback for canceling an operation by | ||
222 | * its ID. | ||
223 | */ | ||
224 | CancelImpl cancel; | ||
225 | }; | ||
194 | 226 | ||
195 | 227 | ||
196 | /** | 228 | /** |
197 | * Different types a tunnel can be. | 229 | * A set that supports a specific operation |
230 | * with other peers. | ||
198 | */ | 231 | */ |
199 | enum TunnelContextType | 232 | struct Set |
200 | { | 233 | { |
201 | /** | 234 | /** |
202 | * Tunnel is waiting for a set request from the tunnel, | 235 | * Client that owns the set. |
203 | * or for the ack/nack of the client for a received request. | 236 | * Only one client may own a set. |
204 | */ | 237 | */ |
205 | CONTEXT_INCOMING, | 238 | struct GNUNET_SERVER_Client *client; |
206 | 239 | ||
207 | /** | 240 | /** |
208 | * The tunnel performs a union operation. | 241 | * Message queue for the client |
209 | */ | 242 | */ |
210 | CONTEXT_OPERATION_UNION, | 243 | struct GNUNET_MQ_Handle *client_mq; |
211 | 244 | ||
212 | /** | 245 | /** |
213 | * The tunnel performs an intersection operation. | 246 | * Type of operation supported for this set |
214 | */ | 247 | */ |
215 | CONTEXT_OPERATION_INTERSECTION, | 248 | enum GNUNET_SET_OperationType operation; |
216 | }; | ||
217 | 249 | ||
250 | /** | ||
251 | * Virtual table for this set. | ||
252 | * Determined by the operation type of this set. | ||
253 | */ | ||
254 | const struct SetVT *vt; | ||
218 | 255 | ||
219 | /** | ||
220 | * State associated with the tunnel, dependent on | ||
221 | * tunnel type. | ||
222 | */ | ||
223 | union TunnelContextData | ||
224 | { | ||
225 | /** | 256 | /** |
226 | * Valid for tag 'CONTEXT_INCOMING' | 257 | * Sets are held in a doubly linked list. |
227 | */ | 258 | */ |
228 | struct Incoming *incoming; | 259 | struct Set *next; |
229 | 260 | ||
230 | /** | 261 | /** |
231 | * Valid for tag 'CONTEXT_OPERATION_UNION' | 262 | * Sets are held in a doubly linked list. |
232 | */ | 263 | */ |
233 | struct UnionEvaluateOperation *union_op; | 264 | struct Set *prev; |
234 | 265 | ||
235 | /** | 266 | /** |
236 | * Valid for tag 'CONTEXT_OPERATION_INTERSECTION' | 267 | * Implementation-specific state. |
237 | */ | 268 | */ |
238 | struct IntersectionEvaluateOperation *intersection_op; | 269 | struct SetState *state; |
239 | }; | 270 | }; |
240 | 271 | ||
241 | 272 | ||
@@ -246,119 +277,24 @@ union TunnelContextData | |||
246 | struct TunnelContext | 277 | struct TunnelContext |
247 | { | 278 | { |
248 | /** | 279 | /** |
249 | * Type of the tunnel. | 280 | * V-Table for the operation belonging |
281 | * to the tunnel contest. | ||
250 | */ | 282 | */ |
251 | enum TunnelContextType type; | 283 | const struct SetVT *vt; |
252 | 284 | ||
253 | /** | 285 | /** |
254 | * State associated with the tunnel, dependent on | 286 | * Implementation-specific operation state. |
255 | * tunnel type. | ||
256 | */ | 287 | */ |
257 | union TunnelContextData data; | 288 | struct OperationState *op; |
258 | }; | 289 | }; |
259 | 290 | ||
260 | 291 | ||
261 | |||
262 | /** | ||
263 | * Configuration of the local peer. | ||
264 | */ | ||
265 | extern const struct GNUNET_CONFIGURATION_Handle *configuration; | ||
266 | |||
267 | /** | ||
268 | * Handle to the mesh service. | ||
269 | */ | ||
270 | extern struct GNUNET_MESH_Handle *mesh; | ||
271 | |||
272 | |||
273 | /** | ||
274 | * Create a new set supporting the union operation | ||
275 | * | ||
276 | * @return the newly created set | ||
277 | */ | ||
278 | struct Set * | ||
279 | _GSS_union_set_create (void); | ||
280 | |||
281 | |||
282 | /** | ||
283 | * Evaluate a union operation with | ||
284 | * a remote peer. | ||
285 | * | ||
286 | * @param spec specification of the operation the evaluate | ||
287 | * @param tunnel tunnel already connected to the partner peer | ||
288 | * @return a handle to the operation | ||
289 | */ | ||
290 | struct UnionEvaluateOperation * | ||
291 | _GSS_union_evaluate (struct OperationSpecification *spec, | ||
292 | struct GNUNET_MESH_Tunnel *tunnel); | ||
293 | |||
294 | |||
295 | /** | ||
296 | * Add the element from the given element message to the set. | ||
297 | * | ||
298 | * @param m message with the element | ||
299 | * @param set set to add the element to | ||
300 | */ | ||
301 | void | ||
302 | _GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set); | ||
303 | |||
304 | |||
305 | /** | ||
306 | * Remove the element given in the element message from the set. | ||
307 | * Only marks the element as removed, so that older set operations can still exchange it. | ||
308 | * | ||
309 | * @param m message with the element | ||
310 | * @param set set to remove the element from | ||
311 | */ | ||
312 | void | ||
313 | _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set); | ||
314 | |||
315 | |||
316 | /** | ||
317 | * Destroy a set that supports the union operation | ||
318 | * | ||
319 | * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION | ||
320 | */ | ||
321 | void | ||
322 | _GSS_union_set_destroy (struct Set *set); | ||
323 | |||
324 | |||
325 | /** | 292 | /** |
326 | * Accept an union operation request from a remote peer | 293 | * Get the table with implementing functions for |
327 | * | 294 | * set union. |
328 | * @param spec all necessary information about the operation | ||
329 | * @param tunnel open tunnel to the partner's peer | ||
330 | * @return operation | ||
331 | */ | ||
332 | struct UnionEvaluateOperation * | ||
333 | _GSS_union_accept (struct OperationSpecification *spec, | ||
334 | struct GNUNET_MESH_Tunnel *tunnel); | ||
335 | |||
336 | |||
337 | /** | ||
338 | * Destroy a union operation, and free all resources | ||
339 | * associated with it. | ||
340 | * | ||
341 | * @param eo the union operation to destroy | ||
342 | */ | ||
343 | void | ||
344 | _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo); | ||
345 | |||
346 | |||
347 | /** | ||
348 | * Dispatch messages for a union operation. | ||
349 | * | ||
350 | * @param cls closure | ||
351 | * @param tunnel mesh tunnel | ||
352 | * @param tunnel_ctx tunnel context | ||
353 | * @param mh message to process | ||
354 | * @return GNUNET_SYSERR if the tunnel should be disconnected, | ||
355 | * GNUNET_OK otherwise | ||
356 | */ | 295 | */ |
357 | int | 296 | const struct SetVT * |
358 | _GSS_union_handle_p2p_message (void *cls, | 297 | _GSS_union_vt (void); |
359 | struct GNUNET_MESH_Tunnel *tunnel, | ||
360 | void **tunnel_ctx, | ||
361 | const struct GNUNET_MessageHeader *mh); | ||
362 | 298 | ||
363 | 299 | ||
364 | #endif | 300 | #endif |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index ed29d5d27..3725de807 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -61,7 +61,7 @@ | |||
61 | 61 | ||
62 | 62 | ||
63 | /** | 63 | /** |
64 | * Current phase we are in for a union operation | 64 | * Current phase we are in for a union operation. |
65 | */ | 65 | */ |
66 | enum UnionOperationPhase | 66 | enum UnionOperationPhase |
67 | { | 67 | { |
@@ -100,7 +100,7 @@ enum UnionOperationPhase | |||
100 | * State of an evaluate operation | 100 | * State of an evaluate operation |
101 | * with another peer. | 101 | * with another peer. |
102 | */ | 102 | */ |
103 | struct UnionEvaluateOperation | 103 | struct OperationState |
104 | { | 104 | { |
105 | /** | 105 | /** |
106 | * Tunnel to the remote peer. | 106 | * Tunnel to the remote peer. |
@@ -154,23 +154,29 @@ struct UnionEvaluateOperation | |||
154 | * was created. | 154 | * was created. |
155 | */ | 155 | */ |
156 | unsigned int generation_created; | 156 | unsigned int generation_created; |
157 | |||
158 | /** | ||
159 | * Set state of the set that this operation | ||
160 | * belongs to. | ||
161 | */ | ||
162 | struct SetState *set_state; | ||
157 | 163 | ||
158 | /** | 164 | /** |
159 | * Evaluate operations are held in | 165 | * Evaluate operations are held in |
160 | * a linked list. | 166 | * a linked list. |
161 | */ | 167 | */ |
162 | struct UnionEvaluateOperation *next; | 168 | struct OperationState *next; |
163 | 169 | ||
164 | /** | 170 | /** |
165 | * Evaluate operations are held in | 171 | * Evaluate operations are held in |
166 | * a linked list. | 172 | * a linked list. |
167 | */ | 173 | */ |
168 | struct UnionEvaluateOperation *prev; | 174 | struct OperationState *prev; |
169 | }; | 175 | }; |
170 | 176 | ||
171 | 177 | ||
172 | /** | 178 | /** |
173 | * Information about the element in a set. | 179 | * Information about an element element in the set. |
174 | * All elements are stored in a hash-table | 180 | * All elements are stored in a hash-table |
175 | * from their hash-code to their 'struct Element', | 181 | * from their hash-code to their 'struct Element', |
176 | * so that the remove and add operations are reasonably | 182 | * so that the remove and add operations are reasonably |
@@ -218,7 +224,8 @@ struct ElementEntry | |||
218 | 224 | ||
219 | 225 | ||
220 | /** | 226 | /** |
221 | * Entries in the key-to-element map of the union set. | 227 | * The key entry is used to associate an ibf key with |
228 | * an element. | ||
222 | */ | 229 | */ |
223 | struct KeyEntry | 230 | struct KeyEntry |
224 | { | 231 | { |
@@ -239,6 +246,7 @@ struct KeyEntry | |||
239 | struct KeyEntry *next_colliding; | 246 | struct KeyEntry *next_colliding; |
240 | }; | 247 | }; |
241 | 248 | ||
249 | |||
242 | /** | 250 | /** |
243 | * Used as a closure for sending elements | 251 | * Used as a closure for sending elements |
244 | * with a specific IBF key. | 252 | * with a specific IBF key. |
@@ -255,14 +263,14 @@ struct SendElementClosure | |||
255 | * Operation for which the elements | 263 | * Operation for which the elements |
256 | * should be sent. | 264 | * should be sent. |
257 | */ | 265 | */ |
258 | struct UnionEvaluateOperation *eo; | 266 | struct OperationState *eo; |
259 | }; | 267 | }; |
260 | 268 | ||
261 | 269 | ||
262 | /** | 270 | /** |
263 | * Extra state required for efficient set union. | 271 | * Extra state required for efficient set union. |
264 | */ | 272 | */ |
265 | struct UnionState | 273 | struct SetState |
266 | { | 274 | { |
267 | /** | 275 | /** |
268 | * The strata estimator is only generated once for | 276 | * The strata estimator is only generated once for |
@@ -281,13 +289,13 @@ struct UnionState | |||
281 | * Evaluate operations are held in | 289 | * Evaluate operations are held in |
282 | * a linked list. | 290 | * a linked list. |
283 | */ | 291 | */ |
284 | struct UnionEvaluateOperation *ops_head; | 292 | struct OperationState *ops_head; |
285 | 293 | ||
286 | /** | 294 | /** |
287 | * Evaluate operations are held in | 295 | * Evaluate operations are held in |
288 | * a linked list. | 296 | * a linked list. |
289 | */ | 297 | */ |
290 | struct UnionEvaluateOperation *ops_tail; | 298 | struct OperationState *ops_tail; |
291 | 299 | ||
292 | /** | 300 | /** |
293 | * Current generation, that is, number of | 301 | * Current generation, that is, number of |
@@ -321,23 +329,6 @@ destroy_elements_iterator (void *cls, | |||
321 | 329 | ||
322 | 330 | ||
323 | /** | 331 | /** |
324 | * Destroy the elements belonging to a union set. | ||
325 | * | ||
326 | * @param us union state that contains the elements | ||
327 | */ | ||
328 | static void | ||
329 | destroy_elements (struct UnionState *us) | ||
330 | { | ||
331 | if (NULL == us->elements) | ||
332 | return; | ||
333 | GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL); | ||
334 | GNUNET_CONTAINER_multihashmap_destroy (us->elements); | ||
335 | us->elements = NULL; | ||
336 | } | ||
337 | |||
338 | |||
339 | |||
340 | /** | ||
341 | * Iterator over hash map entries. | 332 | * Iterator over hash map entries. |
342 | * | 333 | * |
343 | * @param cls closure | 334 | * @param cls closure |
@@ -358,6 +349,11 @@ destroy_key_to_element_iter (void *cls, | |||
358 | { | 349 | { |
359 | struct KeyEntry *k_tmp = k; | 350 | struct KeyEntry *k_tmp = k; |
360 | k = k->next_colliding; | 351 | k = k->next_colliding; |
352 | if (GNUNET_YES == k_tmp->element->remote) | ||
353 | { | ||
354 | GNUNET_free (k_tmp->element); | ||
355 | k_tmp->element = NULL; | ||
356 | } | ||
361 | GNUNET_free (k_tmp); | 357 | GNUNET_free (k_tmp); |
362 | } | 358 | } |
363 | return GNUNET_YES; | 359 | return GNUNET_YES; |
@@ -370,20 +366,24 @@ destroy_key_to_element_iter (void *cls, | |||
370 | * | 366 | * |
371 | * @param eo the union operation to destroy | 367 | * @param eo the union operation to destroy |
372 | */ | 368 | */ |
373 | void | 369 | static void |
374 | _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | 370 | union_operation_destroy (struct OperationState *eo) |
375 | { | 371 | { |
376 | struct UnionState *st = eo->spec->set->state.u; | 372 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); |
377 | 373 | GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head, | |
378 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); | 374 | eo->set_state->ops_tail, |
379 | 375 | eo); | |
376 | if (NULL != eo->mq) | ||
377 | { | ||
378 | GNUNET_MQ_destroy (eo->mq); | ||
379 | eo->mq = NULL; | ||
380 | } | ||
380 | if (NULL != eo->tunnel) | 381 | if (NULL != eo->tunnel) |
381 | { | 382 | { |
382 | struct GNUNET_MESH_Tunnel *t = eo->tunnel; | 383 | struct GNUNET_MESH_Tunnel *t = eo->tunnel; |
383 | eo->tunnel = NULL; | 384 | eo->tunnel = NULL; |
384 | GNUNET_MESH_tunnel_destroy (t); | 385 | GNUNET_MESH_tunnel_destroy (t); |
385 | } | 386 | } |
386 | |||
387 | if (NULL != eo->remote_ibf) | 387 | if (NULL != eo->remote_ibf) |
388 | { | 388 | { |
389 | ibf_destroy (eo->remote_ibf); | 389 | ibf_destroy (eo->remote_ibf); |
@@ -405,15 +405,19 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | |||
405 | GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); | 405 | GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); |
406 | eo->key_to_element = NULL; | 406 | eo->key_to_element = NULL; |
407 | } | 407 | } |
408 | 408 | if (NULL != eo->spec) | |
409 | GNUNET_CONTAINER_DLL_remove (st->ops_head, | 409 | { |
410 | st->ops_tail, | 410 | if (NULL != eo->spec->context_msg) |
411 | eo); | 411 | { |
412 | GNUNET_free (eo->spec->context_msg); | ||
413 | eo->spec->context_msg = NULL; | ||
414 | } | ||
415 | GNUNET_free (eo->spec); | ||
416 | eo->spec = NULL; | ||
417 | } | ||
412 | GNUNET_free (eo); | 418 | GNUNET_free (eo); |
413 | 419 | ||
414 | 420 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); | |
415 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); | ||
416 | |||
417 | 421 | ||
418 | /* FIXME: do a garbage collection of the set generations */ | 422 | /* FIXME: do a garbage collection of the set generations */ |
419 | } | 423 | } |
@@ -426,7 +430,7 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | |||
426 | * @param eo the union operation to fail | 430 | * @param eo the union operation to fail |
427 | */ | 431 | */ |
428 | static void | 432 | static void |
429 | fail_union_operation (struct UnionEvaluateOperation *eo) | 433 | fail_union_operation (struct OperationState *eo) |
430 | { | 434 | { |
431 | struct GNUNET_MQ_Envelope *ev; | 435 | struct GNUNET_MQ_Envelope *ev; |
432 | struct GNUNET_SET_ResultMessage *msg; | 436 | struct GNUNET_SET_ResultMessage *msg; |
@@ -434,8 +438,9 @@ fail_union_operation (struct UnionEvaluateOperation *eo) | |||
434 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 438 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
435 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 439 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
436 | msg->request_id = htonl (eo->spec->client_request_id); | 440 | msg->request_id = htonl (eo->spec->client_request_id); |
441 | msg->element_type = htons (0); | ||
437 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 442 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); |
438 | _GSS_union_operation_destroy (eo); | 443 | union_operation_destroy (eo); |
439 | } | 444 | } |
440 | 445 | ||
441 | 446 | ||
@@ -467,13 +472,13 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | |||
467 | * @param eo operation with the other peer | 472 | * @param eo operation with the other peer |
468 | */ | 473 | */ |
469 | static void | 474 | static void |
470 | send_operation_request (struct UnionEvaluateOperation *eo) | 475 | send_operation_request (struct OperationState *eo) |
471 | { | 476 | { |
472 | struct GNUNET_MQ_Envelope *ev; | 477 | struct GNUNET_MQ_Envelope *ev; |
473 | struct OperationRequestMessage *msg; | 478 | struct OperationRequestMessage *msg; |
474 | 479 | ||
475 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 480 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
476 | eo->spec->context_msg); | 481 | eo->spec->context_msg); |
477 | 482 | ||
478 | if (NULL == ev) | 483 | if (NULL == ev) |
479 | { | 484 | { |
@@ -484,6 +489,7 @@ send_operation_request (struct UnionEvaluateOperation *eo) | |||
484 | } | 489 | } |
485 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); | 490 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); |
486 | msg->app_id = eo->spec->app_id; | 491 | msg->app_id = eo->spec->app_id; |
492 | msg->salt = htonl (eo->spec->salt); | ||
487 | GNUNET_MQ_send (eo->mq, ev); | 493 | GNUNET_MQ_send (eo->mq, ev); |
488 | 494 | ||
489 | if (NULL != eo->spec->context_msg) | 495 | if (NULL != eo->spec->context_msg) |
@@ -492,7 +498,7 @@ send_operation_request (struct UnionEvaluateOperation *eo) | |||
492 | eo->spec->context_msg = NULL; | 498 | eo->spec->context_msg = NULL; |
493 | } | 499 | } |
494 | 500 | ||
495 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); | 501 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request\n"); |
496 | } | 502 | } |
497 | 503 | ||
498 | 504 | ||
@@ -532,13 +538,13 @@ insert_element_iterator (void *cls, | |||
532 | 538 | ||
533 | /** | 539 | /** |
534 | * Insert an element into the union operation's | 540 | * Insert an element into the union operation's |
535 | * key-to-element mapping | 541 | * key-to-element mapping. Takes ownership of 'ee'. |
536 | * | 542 | * |
537 | * @param eo the union operation | 543 | * @param eo the union operation |
538 | * @param ee the element entry | 544 | * @param ee the element entry |
539 | */ | 545 | */ |
540 | static void | 546 | static void |
541 | insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) | 547 | insert_element (struct OperationState *eo, struct ElementEntry *ee) |
542 | { | 548 | { |
543 | int ret; | 549 | int ret; |
544 | struct IBF_Key ibf_key; | 550 | struct IBF_Key ibf_key; |
@@ -595,7 +601,7 @@ init_key_to_element_iterator (void *cls, | |||
595 | const struct GNUNET_HashCode *key, | 601 | const struct GNUNET_HashCode *key, |
596 | void *value) | 602 | void *value) |
597 | { | 603 | { |
598 | struct UnionEvaluateOperation *eo = cls; | 604 | struct OperationState *eo = cls; |
599 | struct ElementEntry *e = value; | 605 | struct ElementEntry *e = value; |
600 | 606 | ||
601 | /* make sure that the element belongs to the set at the time | 607 | /* make sure that the element belongs to the set at the time |
@@ -605,6 +611,8 @@ init_key_to_element_iterator (void *cls, | |||
605 | (e->generation_removed < eo->generation_created))) | 611 | (e->generation_removed < eo->generation_created))) |
606 | return GNUNET_YES; | 612 | return GNUNET_YES; |
607 | 613 | ||
614 | e->remote = GNUNET_NO; | ||
615 | |||
608 | insert_element (eo, e); | 616 | insert_element (eo, e); |
609 | return GNUNET_YES; | 617 | return GNUNET_YES; |
610 | } | 618 | } |
@@ -618,15 +626,15 @@ init_key_to_element_iterator (void *cls, | |||
618 | * @param size size of the ibf to create | 626 | * @param size size of the ibf to create |
619 | */ | 627 | */ |
620 | static void | 628 | static void |
621 | prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) | 629 | prepare_ibf (struct OperationState *eo, uint16_t size) |
622 | { | 630 | { |
623 | if (NULL == eo->key_to_element) | 631 | if (NULL == eo->key_to_element) |
624 | { | 632 | { |
625 | unsigned int len; | 633 | unsigned int len; |
626 | len = GNUNET_CONTAINER_multihashmap_size (eo->spec->set->state.u->elements); | 634 | len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements); |
627 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | 635 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); |
628 | GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements, | 636 | GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements, |
629 | init_key_to_element_iterator, eo); | 637 | init_key_to_element_iterator, eo); |
630 | } | 638 | } |
631 | if (NULL != eo->local_ibf) | 639 | if (NULL != eo->local_ibf) |
632 | ibf_destroy (eo->local_ibf); | 640 | ibf_destroy (eo->local_ibf); |
@@ -643,14 +651,14 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) | |||
643 | * @param ibf_order order of the ibf to send, size=2^order | 651 | * @param ibf_order order of the ibf to send, size=2^order |
644 | */ | 652 | */ |
645 | static void | 653 | static void |
646 | send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | 654 | send_ibf (struct OperationState *eo, uint16_t ibf_order) |
647 | { | 655 | { |
648 | unsigned int buckets_sent = 0; | 656 | unsigned int buckets_sent = 0; |
649 | struct InvertibleBloomFilter *ibf; | 657 | struct InvertibleBloomFilter *ibf; |
650 | 658 | ||
651 | prepare_ibf (eo, 1<<ibf_order); | 659 | prepare_ibf (eo, 1<<ibf_order); |
652 | 660 | ||
653 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order); | 661 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); |
654 | 662 | ||
655 | ibf = eo->local_ibf; | 663 | ibf = eo->local_ibf; |
656 | 664 | ||
@@ -667,11 +675,14 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
667 | 675 | ||
668 | ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, | 676 | ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, |
669 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); | 677 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); |
678 | msg->reserved = 0; | ||
670 | msg->order = ibf_order; | 679 | msg->order = ibf_order; |
671 | msg->offset = htons (buckets_sent); | 680 | msg->offset = htons (buckets_sent); |
672 | ibf_write_slice (ibf, buckets_sent, | 681 | ibf_write_slice (ibf, buckets_sent, |
673 | buckets_in_message, &msg[1]); | 682 | buckets_in_message, &msg[1]); |
674 | buckets_sent += buckets_in_message; | 683 | buckets_sent += buckets_in_message; |
684 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", | ||
685 | buckets_in_message, buckets_sent, 1<<ibf_order); | ||
675 | GNUNET_MQ_send (eo->mq, ev); | 686 | GNUNET_MQ_send (eo->mq, ev); |
676 | } | 687 | } |
677 | 688 | ||
@@ -685,18 +696,18 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
685 | * @param eo the union operation with the remote peer | 696 | * @param eo the union operation with the remote peer |
686 | */ | 697 | */ |
687 | static void | 698 | static void |
688 | send_strata_estimator (struct UnionEvaluateOperation *eo) | 699 | send_strata_estimator (struct OperationState *eo) |
689 | { | 700 | { |
690 | struct GNUNET_MQ_Envelope *ev; | 701 | struct GNUNET_MQ_Envelope *ev; |
691 | struct GNUNET_MessageHeader *strata_msg; | 702 | struct GNUNET_MessageHeader *strata_msg; |
692 | struct UnionState *st = eo->spec->set->state.u; | ||
693 | 703 | ||
694 | ev = GNUNET_MQ_msg_header_extra (strata_msg, | 704 | ev = GNUNET_MQ_msg_header_extra (strata_msg, |
695 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | 705 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, |
696 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | 706 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); |
697 | strata_estimator_write (st->se, &strata_msg[1]); | 707 | strata_estimator_write (eo->set_state->se, &strata_msg[1]); |
698 | GNUNET_MQ_send (eo->mq, ev); | 708 | GNUNET_MQ_send (eo->mq, ev); |
699 | eo->phase = PHASE_EXPECT_IBF; | 709 | eo->phase = PHASE_EXPECT_IBF; |
710 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); | ||
700 | } | 711 | } |
701 | 712 | ||
702 | 713 | ||
@@ -730,7 +741,7 @@ get_order_from_difference (unsigned int diff) | |||
730 | static void | 741 | static void |
731 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | 742 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) |
732 | { | 743 | { |
733 | struct UnionEvaluateOperation *eo = cls; | 744 | struct OperationState *eo = cls; |
734 | struct StrataEstimator *remote_se; | 745 | struct StrataEstimator *remote_se; |
735 | int diff; | 746 | int diff; |
736 | 747 | ||
@@ -746,7 +757,7 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | |||
746 | strata_estimator_read (&mh[1], remote_se); | 757 | strata_estimator_read (&mh[1], remote_se); |
747 | GNUNET_assert (NULL != eo->se); | 758 | GNUNET_assert (NULL != eo->se); |
748 | diff = strata_estimator_difference (remote_se, eo->se); | 759 | diff = strata_estimator_difference (remote_se, eo->se); |
749 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff); | 760 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, diff=%d\n", diff); |
750 | strata_estimator_destroy (remote_se); | 761 | strata_estimator_destroy (remote_se); |
751 | strata_estimator_destroy (eo->se); | 762 | strata_estimator_destroy (eo->se); |
752 | eo->se = NULL; | 763 | eo->se = NULL; |
@@ -769,7 +780,7 @@ send_element_iterator (void *cls, | |||
769 | { | 780 | { |
770 | struct SendElementClosure *sec = cls; | 781 | struct SendElementClosure *sec = cls; |
771 | struct IBF_Key ibf_key = sec->ibf_key; | 782 | struct IBF_Key ibf_key = sec->ibf_key; |
772 | struct UnionEvaluateOperation *eo = sec->eo; | 783 | struct OperationState *eo = sec->eo; |
773 | struct KeyEntry *ke = value; | 784 | struct KeyEntry *ke = value; |
774 | 785 | ||
775 | if (ke->ibf_key.key_val != ibf_key.key_val) | 786 | if (ke->ibf_key.key_val != ibf_key.key_val) |
@@ -789,7 +800,7 @@ send_element_iterator (void *cls, | |||
789 | continue; | 800 | continue; |
790 | } | 801 | } |
791 | memcpy (&mh[1], element->data, element->size); | 802 | memcpy (&mh[1], element->data, element->size); |
792 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); | 803 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element to client\n"); |
793 | GNUNET_MQ_send (eo->mq, ev); | 804 | GNUNET_MQ_send (eo->mq, ev); |
794 | ke = ke->next_colliding; | 805 | ke = ke->next_colliding; |
795 | } | 806 | } |
@@ -804,7 +815,7 @@ send_element_iterator (void *cls, | |||
804 | * @param ibf_key IBF key of interest | 815 | * @param ibf_key IBF key of interest |
805 | */ | 816 | */ |
806 | static void | 817 | static void |
807 | send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key) | 818 | send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) |
808 | { | 819 | { |
809 | struct SendElementClosure send_cls; | 820 | struct SendElementClosure send_cls; |
810 | 821 | ||
@@ -822,7 +833,7 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key | |||
822 | * @param eo union operation | 833 | * @param eo union operation |
823 | */ | 834 | */ |
824 | static void | 835 | static void |
825 | decode_and_send (struct UnionEvaluateOperation *eo) | 836 | decode_and_send (struct OperationState *eo) |
826 | { | 837 | { |
827 | struct IBF_Key key; | 838 | struct IBF_Key key; |
828 | int side; | 839 | int side; |
@@ -833,6 +844,9 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
833 | prepare_ibf (eo, eo->remote_ibf->size); | 844 | prepare_ibf (eo, eo->remote_ibf->size); |
834 | diff_ibf = ibf_dup (eo->local_ibf); | 845 | diff_ibf = ibf_dup (eo->local_ibf); |
835 | ibf_subtract (diff_ibf, eo->remote_ibf); | 846 | ibf_subtract (diff_ibf, eo->remote_ibf); |
847 | |||
848 | ibf_destroy (eo->remote_ibf); | ||
849 | eo->remote_ibf = NULL; | ||
836 | 850 | ||
837 | while (1) | 851 | while (1) |
838 | { | 852 | { |
@@ -864,7 +878,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
864 | { | 878 | { |
865 | struct GNUNET_MQ_Envelope *ev; | 879 | struct GNUNET_MQ_Envelope *ev; |
866 | 880 | ||
867 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); | 881 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); |
868 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 882 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
869 | GNUNET_MQ_send (eo->mq, ev); | 883 | GNUNET_MQ_send (eo->mq, ev); |
870 | break; | 884 | break; |
@@ -899,7 +913,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
899 | static void | 913 | static void |
900 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | 914 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) |
901 | { | 915 | { |
902 | struct UnionEvaluateOperation *eo = cls; | 916 | struct OperationState *eo = cls; |
903 | struct IBFMessage *msg = (struct IBFMessage *) mh; | 917 | struct IBFMessage *msg = (struct IBFMessage *) mh; |
904 | unsigned int buckets_in_message; | 918 | unsigned int buckets_in_message; |
905 | 919 | ||
@@ -908,8 +922,9 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
908 | { | 922 | { |
909 | eo->phase = PHASE_EXPECT_IBF_CONT; | 923 | eo->phase = PHASE_EXPECT_IBF_CONT; |
910 | GNUNET_assert (NULL == eo->remote_ibf); | 924 | GNUNET_assert (NULL == eo->remote_ibf); |
911 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order); | 925 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); |
912 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | 926 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); |
927 | eo->ibf_buckets_received = 0; | ||
913 | if (0 != ntohs (msg->offset)) | 928 | if (0 != ntohs (msg->offset)) |
914 | { | 929 | { |
915 | GNUNET_break (0); | 930 | GNUNET_break (0); |
@@ -929,6 +944,13 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
929 | 944 | ||
930 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | 945 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; |
931 | 946 | ||
947 | if (0 == buckets_in_message) | ||
948 | { | ||
949 | GNUNET_break_op (0); | ||
950 | fail_union_operation (eo); | ||
951 | return; | ||
952 | } | ||
953 | |||
932 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) | 954 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) |
933 | { | 955 | { |
934 | GNUNET_break (0); | 956 | GNUNET_break (0); |
@@ -942,7 +964,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
942 | if (eo->ibf_buckets_received == eo->remote_ibf->size) | 964 | if (eo->ibf_buckets_received == eo->remote_ibf->size) |
943 | { | 965 | { |
944 | 966 | ||
945 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n"); | 967 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); |
946 | eo->phase = PHASE_EXPECT_ELEMENTS; | 968 | eo->phase = PHASE_EXPECT_ELEMENTS; |
947 | decode_and_send (eo); | 969 | decode_and_send (eo); |
948 | } | 970 | } |
@@ -957,12 +979,13 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
957 | * @param element element to send | 979 | * @param element element to send |
958 | */ | 980 | */ |
959 | static void | 981 | static void |
960 | send_client_element (struct UnionEvaluateOperation *eo, | 982 | send_client_element (struct OperationState *eo, |
961 | struct GNUNET_SET_Element *element) | 983 | struct GNUNET_SET_Element *element) |
962 | { | 984 | { |
963 | struct GNUNET_MQ_Envelope *ev; | 985 | struct GNUNET_MQ_Envelope *ev; |
964 | struct GNUNET_SET_ResultMessage *rm; | 986 | struct GNUNET_SET_ResultMessage *rm; |
965 | 987 | ||
988 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending el of size %u\n", element->size); | ||
966 | GNUNET_assert (0 != eo->spec->client_request_id); | 989 | GNUNET_assert (0 != eo->spec->client_request_id); |
967 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | 990 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
968 | if (NULL == ev) | 991 | if (NULL == ev) |
@@ -973,6 +996,7 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
973 | } | 996 | } |
974 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 997 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
975 | rm->request_id = htonl (eo->spec->client_request_id); | 998 | rm->request_id = htonl (eo->spec->client_request_id); |
999 | rm->element_type = element->type; | ||
976 | memcpy (&rm[1], element->data, element->size); | 1000 | memcpy (&rm[1], element->data, element->size); |
977 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 1001 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); |
978 | } | 1002 | } |
@@ -987,7 +1011,7 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
987 | * @param eo union operation | 1011 | * @param eo union operation |
988 | */ | 1012 | */ |
989 | static void | 1013 | static void |
990 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | 1014 | send_client_done_and_destroy (struct OperationState *eo) |
991 | { | 1015 | { |
992 | struct GNUNET_MQ_Envelope *ev; | 1016 | struct GNUNET_MQ_Envelope *ev; |
993 | struct GNUNET_SET_ResultMessage *rm; | 1017 | struct GNUNET_SET_ResultMessage *rm; |
@@ -995,6 +1019,7 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | |||
995 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 1019 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
996 | rm->request_id = htonl (eo->spec->client_request_id); | 1020 | rm->request_id = htonl (eo->spec->client_request_id); |
997 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 1021 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1022 | rm->element_type = htons (0); | ||
998 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 1023 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); |
999 | 1024 | ||
1000 | } | 1025 | } |
@@ -1009,11 +1034,11 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | |||
1009 | static void | 1034 | static void |
1010 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | 1035 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) |
1011 | { | 1036 | { |
1012 | struct UnionEvaluateOperation *eo = cls; | 1037 | struct OperationState *eo = cls; |
1013 | struct ElementEntry *ee; | 1038 | struct ElementEntry *ee; |
1014 | uint16_t element_size; | 1039 | uint16_t element_size; |
1015 | 1040 | ||
1016 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n"); | 1041 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); |
1017 | 1042 | ||
1018 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && | 1043 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && |
1019 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) | 1044 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) |
@@ -1025,13 +1050,12 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1025 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); | 1050 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); |
1026 | ee = GNUNET_malloc (sizeof *eo + element_size); | 1051 | ee = GNUNET_malloc (sizeof *eo + element_size); |
1027 | memcpy (&ee[1], &mh[1], element_size); | 1052 | memcpy (&ee[1], &mh[1], element_size); |
1053 | ee->element.size = element_size; | ||
1028 | ee->element.data = &ee[1]; | 1054 | ee->element.data = &ee[1]; |
1029 | ee->remote = GNUNET_YES; | 1055 | ee->remote = GNUNET_YES; |
1030 | 1056 | ||
1031 | insert_element (eo, ee); | 1057 | insert_element (eo, ee); |
1032 | send_client_element (eo, &ee->element); | 1058 | send_client_element (eo, &ee->element); |
1033 | |||
1034 | GNUNET_free (ee); | ||
1035 | } | 1059 | } |
1036 | 1060 | ||
1037 | 1061 | ||
@@ -1044,7 +1068,7 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1044 | static void | 1068 | static void |
1045 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | 1069 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) |
1046 | { | 1070 | { |
1047 | struct UnionEvaluateOperation *eo = cls; | 1071 | struct OperationState *eo = cls; |
1048 | struct IBF_Key *ibf_key; | 1072 | struct IBF_Key *ibf_key; |
1049 | unsigned int num_keys; | 1073 | unsigned int num_keys; |
1050 | 1074 | ||
@@ -1082,7 +1106,7 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1082 | static void | 1106 | static void |
1083 | peer_done_sent_cb (void *cls) | 1107 | peer_done_sent_cb (void *cls) |
1084 | { | 1108 | { |
1085 | struct UnionEvaluateOperation *eo = cls; | 1109 | struct OperationState *eo = cls; |
1086 | 1110 | ||
1087 | send_client_done_and_destroy (eo); | 1111 | send_client_done_and_destroy (eo); |
1088 | } | 1112 | } |
@@ -1097,14 +1121,14 @@ peer_done_sent_cb (void *cls) | |||
1097 | static void | 1121 | static void |
1098 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | 1122 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) |
1099 | { | 1123 | { |
1100 | struct UnionEvaluateOperation *eo = cls; | 1124 | struct OperationState *eo = cls; |
1101 | 1125 | ||
1102 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1126 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
1103 | { | 1127 | { |
1104 | /* we got all requests, but still have to send our elements as response */ | 1128 | /* we got all requests, but still have to send our elements as response */ |
1105 | struct GNUNET_MQ_Envelope *ev; | 1129 | struct GNUNET_MQ_Envelope *ev; |
1106 | 1130 | ||
1107 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); | 1131 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); |
1108 | eo->phase = PHASE_FINISHED; | 1132 | eo->phase = PHASE_FINISHED; |
1109 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 1133 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
1110 | GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo); | 1134 | GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo); |
@@ -1113,7 +1137,7 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1113 | } | 1137 | } |
1114 | if (eo->phase == PHASE_EXPECT_ELEMENTS) | 1138 | if (eo->phase == PHASE_EXPECT_ELEMENTS) |
1115 | { | 1139 | { |
1116 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n"); | 1140 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); |
1117 | eo->phase = PHASE_FINISHED; | 1141 | eo->phase = PHASE_FINISHED; |
1118 | send_client_done_and_destroy (eo); | 1142 | send_client_done_and_destroy (eo); |
1119 | return; | 1143 | return; |
@@ -1129,34 +1153,38 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1129 | * | 1153 | * |
1130 | * @param spec specification of the operation the evaluate | 1154 | * @param spec specification of the operation the evaluate |
1131 | * @param tunnel tunnel already connected to the partner peer | 1155 | * @param tunnel tunnel already connected to the partner peer |
1156 | * @param tc tunnel context, passed here so all new incoming | ||
1157 | * messages are directly going to the union operations | ||
1132 | * @return a handle to the operation | 1158 | * @return a handle to the operation |
1133 | */ | 1159 | */ |
1134 | struct UnionEvaluateOperation * | 1160 | static void |
1135 | _GSS_union_evaluate (struct OperationSpecification *spec, | 1161 | union_evaluate (struct OperationSpecification *spec, |
1136 | struct GNUNET_MESH_Tunnel *tunnel) | 1162 | struct GNUNET_MESH_Tunnel *tunnel, |
1163 | struct TunnelContext *tc) | ||
1137 | { | 1164 | { |
1138 | struct UnionEvaluateOperation *eo; | 1165 | struct OperationState *eo; |
1139 | struct UnionState *st = spec->set->state.u; | ||
1140 | 1166 | ||
1141 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1167 | eo = GNUNET_new (struct OperationState); |
1142 | eo->se = strata_estimator_dup (spec->set->state.u->se); | 1168 | tc->vt = _GSS_union_vt (); |
1169 | tc->op = eo; | ||
1170 | eo->se = strata_estimator_dup (spec->set->state->se); | ||
1171 | eo->set_state = spec->set->state; | ||
1143 | eo->spec = spec; | 1172 | eo->spec = spec; |
1144 | eo->tunnel = tunnel; | 1173 | eo->tunnel = tunnel; |
1174 | eo->mq = GNUNET_MESH_mq_create (tunnel); | ||
1145 | 1175 | ||
1146 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1176 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1147 | "evaluating union operation, (app %s)\n", | 1177 | "evaluating union operation, (app %s)\n", |
1148 | GNUNET_h2s (&eo->spec->app_id)); | 1178 | GNUNET_h2s (&eo->spec->app_id)); |
1149 | 1179 | ||
1150 | /* we started the operation, thus we have to send the operation request */ | 1180 | /* we started the operation, thus we have to send the operation request */ |
1151 | eo->phase = PHASE_EXPECT_SE; | 1181 | eo->phase = PHASE_EXPECT_SE; |
1152 | 1182 | ||
1153 | GNUNET_CONTAINER_DLL_insert (st->ops_head, | 1183 | GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head, |
1154 | st->ops_tail, | 1184 | eo->set_state->ops_tail, |
1155 | eo); | 1185 | eo); |
1156 | 1186 | ||
1157 | send_operation_request (eo); | 1187 | send_operation_request (eo); |
1158 | |||
1159 | return eo; | ||
1160 | } | 1188 | } |
1161 | 1189 | ||
1162 | 1190 | ||
@@ -1165,30 +1193,34 @@ _GSS_union_evaluate (struct OperationSpecification *spec, | |||
1165 | * | 1193 | * |
1166 | * @param spec all necessary information about the operation | 1194 | * @param spec all necessary information about the operation |
1167 | * @param tunnel open tunnel to the partner's peer | 1195 | * @param tunnel open tunnel to the partner's peer |
1196 | * @param tc tunnel context, passed here so all new incoming | ||
1197 | * messages are directly going to the union operations | ||
1168 | * @return operation | 1198 | * @return operation |
1169 | */ | 1199 | */ |
1170 | struct UnionEvaluateOperation * | 1200 | static void |
1171 | _GSS_union_accept (struct OperationSpecification *spec, | 1201 | union_accept (struct OperationSpecification *spec, |
1172 | struct GNUNET_MESH_Tunnel *tunnel) | 1202 | struct GNUNET_MESH_Tunnel *tunnel, |
1203 | struct TunnelContext *tc) | ||
1173 | { | 1204 | { |
1174 | struct UnionEvaluateOperation *eo; | 1205 | struct OperationState *eo; |
1175 | struct UnionState *st = spec->set->state.u; | ||
1176 | 1206 | ||
1177 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); | 1207 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); |
1178 | 1208 | ||
1179 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1209 | eo = GNUNET_new (struct OperationState); |
1180 | eo->generation_created = st->current_generation++; | 1210 | tc->vt = _GSS_union_vt (); |
1211 | tc->op = eo; | ||
1212 | eo->set_state = spec->set->state; | ||
1213 | eo->generation_created = eo->set_state->current_generation++; | ||
1181 | eo->spec = spec; | 1214 | eo->spec = spec; |
1182 | eo->tunnel = tunnel; | 1215 | eo->tunnel = tunnel; |
1183 | eo->se = strata_estimator_dup (st->se); | 1216 | eo->mq = GNUNET_MESH_mq_create (tunnel); |
1217 | eo->se = strata_estimator_dup (eo->set_state->se); | ||
1184 | /* transfer ownership of mq and socket from incoming to eo */ | 1218 | /* transfer ownership of mq and socket from incoming to eo */ |
1185 | GNUNET_CONTAINER_DLL_insert (st->ops_head, | 1219 | GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head, |
1186 | st->ops_tail, | 1220 | eo->set_state->ops_tail, |
1187 | eo); | 1221 | eo); |
1188 | /* kick off the operation */ | 1222 | /* kick off the operation */ |
1189 | send_strata_estimator (eo); | 1223 | send_strata_estimator (eo); |
1190 | |||
1191 | return eo; | ||
1192 | } | 1224 | } |
1193 | 1225 | ||
1194 | 1226 | ||
@@ -1197,111 +1229,101 @@ _GSS_union_accept (struct OperationSpecification *spec, | |||
1197 | * | 1229 | * |
1198 | * @return the newly created set | 1230 | * @return the newly created set |
1199 | */ | 1231 | */ |
1200 | struct Set * | 1232 | static struct SetState * |
1201 | _GSS_union_set_create (void) | 1233 | union_set_create (void) |
1202 | { | 1234 | { |
1203 | struct Set *set; | 1235 | struct SetState *set_state; |
1204 | 1236 | ||
1205 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n"); | 1237 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n"); |
1206 | 1238 | ||
1207 | set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); | 1239 | set_state = GNUNET_new (struct SetState); |
1208 | set->state.u = (struct UnionState *) &set[1]; | ||
1209 | set->operation = GNUNET_SET_OPERATION_UNION; | ||
1210 | /* keys of the hash map are stored in the element entrys, thus we do not | 1240 | /* keys of the hash map are stored in the element entrys, thus we do not |
1211 | * want the hash map to copy them */ | 1241 | * want the hash map to copy them */ |
1212 | set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | 1242 | set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); |
1213 | set->state.u->se = strata_estimator_create (SE_STRATA_COUNT, | 1243 | set_state->se = strata_estimator_create (SE_STRATA_COUNT, |
1214 | SE_IBF_SIZE, SE_IBF_HASH_NUM); | 1244 | SE_IBF_SIZE, SE_IBF_HASH_NUM); |
1215 | return set; | 1245 | return set_state; |
1216 | } | 1246 | } |
1217 | 1247 | ||
1218 | 1248 | ||
1219 | /** | 1249 | /** |
1220 | * Add the element from the given element message to the set. | 1250 | * Add the element from the given element message to the set. |
1221 | * | 1251 | * |
1222 | * @param m message with the element | 1252 | * @param set_state state of the set want to add to |
1223 | * @param set set to add the element to | 1253 | * @param element the element to add to the set |
1224 | */ | 1254 | */ |
1225 | void | 1255 | static void |
1226 | _GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set) | 1256 | union_add (struct SetState *set_state, const struct GNUNET_SET_Element *element) |
1227 | { | 1257 | { |
1228 | struct ElementEntry *ee; | 1258 | struct ElementEntry *ee; |
1229 | struct ElementEntry *ee_dup; | 1259 | struct ElementEntry *ee_dup; |
1230 | uint16_t element_size; | ||
1231 | 1260 | ||
1232 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n"); | 1261 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n", element->size); |
1233 | 1262 | ||
1234 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); | 1263 | ee = GNUNET_malloc (element->size + sizeof *ee); |
1235 | element_size = ntohs (m->header.size) - sizeof *m; | 1264 | ee->element.size = element->size; |
1236 | ee = GNUNET_malloc (element_size + sizeof *ee); | 1265 | memcpy (&ee[1], element->data, element->size); |
1237 | ee->element.size = element_size; | ||
1238 | memcpy (&ee[1], &m[1], element_size); | ||
1239 | ee->element.data = &ee[1]; | 1266 | ee->element.data = &ee[1]; |
1240 | ee->generation_added = set->state.u->current_generation; | 1267 | ee->generation_added = set_state->current_generation; |
1241 | GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash); | 1268 | ee->remote = GNUNET_NO; |
1242 | ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash); | 1269 | GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash); |
1270 | ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements, | ||
1271 | &ee->element_hash); | ||
1243 | if (NULL != ee_dup) | 1272 | if (NULL != ee_dup) |
1244 | { | 1273 | { |
1245 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n"); | 1274 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n"); |
1246 | GNUNET_free (ee); | 1275 | GNUNET_free (ee); |
1247 | return; | 1276 | return; |
1248 | } | 1277 | } |
1249 | GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee, | 1278 | GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash, ee, |
1250 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 1279 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); |
1251 | strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0)); | 1280 | strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0)); |
1252 | } | 1281 | } |
1253 | 1282 | ||
1254 | 1283 | ||
1255 | /** | 1284 | /** |
1256 | * Destroy a set that supports the union operation | 1285 | * Destroy a set that supports the union operation |
1257 | * | 1286 | * |
1258 | * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION | 1287 | * @param set_state the set to destroy |
1259 | */ | 1288 | */ |
1260 | void | 1289 | static void |
1261 | _GSS_union_set_destroy (struct Set *set) | 1290 | union_set_destroy (struct SetState *set_state) |
1262 | { | 1291 | { |
1263 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); | 1292 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n"); |
1264 | if (NULL != set->client) | 1293 | /* important to destroy operations before the rest of the set */ |
1294 | while (NULL != set_state->ops_head) | ||
1295 | union_operation_destroy (set_state->ops_head); | ||
1296 | if (NULL != set_state->se) | ||
1265 | { | 1297 | { |
1266 | GNUNET_SERVER_client_drop (set->client); | 1298 | strata_estimator_destroy (set_state->se); |
1267 | set->client = NULL; | 1299 | set_state->se = NULL; |
1268 | } | 1300 | } |
1269 | if (NULL != set->client_mq) | 1301 | if (NULL != set_state->elements) |
1270 | { | ||
1271 | GNUNET_MQ_destroy (set->client_mq); | ||
1272 | set->client_mq = NULL; | ||
1273 | } | ||
1274 | |||
1275 | if (NULL != set->state.u->se) | ||
1276 | { | 1302 | { |
1277 | strata_estimator_destroy (set->state.u->se); | 1303 | GNUNET_CONTAINER_multihashmap_iterate (set_state->elements, |
1278 | set->state.u->se = NULL; | 1304 | destroy_elements_iterator, NULL); |
1305 | GNUNET_CONTAINER_multihashmap_destroy (set_state->elements); | ||
1306 | set_state->elements = NULL; | ||
1279 | } | 1307 | } |
1280 | 1308 | ||
1281 | destroy_elements (set->state.u); | 1309 | GNUNET_free (set_state); |
1282 | |||
1283 | while (NULL != set->state.u->ops_head) | ||
1284 | { | ||
1285 | _GSS_union_operation_destroy (set->state.u->ops_head); | ||
1286 | } | ||
1287 | } | 1310 | } |
1288 | 1311 | ||
1289 | /** | 1312 | /** |
1290 | * Remove the element given in the element message from the set. | 1313 | * Remove the element given in the element message from the set. |
1291 | * Only marks the element as removed, so that older set operations can still exchange it. | 1314 | * Only marks the element as removed, so that older set operations can still exchange it. |
1292 | * | 1315 | * |
1293 | * @param m message with the element | 1316 | * @param set_state state of the set to remove from |
1294 | * @param set set to remove the element from | 1317 | * @param element set element to remove |
1295 | */ | 1318 | */ |
1296 | void | 1319 | static void |
1297 | _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) | 1320 | union_remove (struct SetState *set_state, const struct GNUNET_SET_Element *element) |
1298 | { | 1321 | { |
1299 | struct GNUNET_HashCode hash; | 1322 | struct GNUNET_HashCode hash; |
1300 | struct ElementEntry *ee; | 1323 | struct ElementEntry *ee; |
1301 | 1324 | ||
1302 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); | 1325 | GNUNET_CRYPTO_hash (element->data, element->size, &hash); |
1303 | GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash); | 1326 | ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash); |
1304 | ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash); | ||
1305 | if (NULL == ee) | 1327 | if (NULL == ee) |
1306 | { | 1328 | { |
1307 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n"); | 1329 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n"); |
@@ -1313,36 +1335,22 @@ _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) | |||
1313 | return; | 1335 | return; |
1314 | } | 1336 | } |
1315 | ee->removed = GNUNET_YES; | 1337 | ee->removed = GNUNET_YES; |
1316 | ee->generation_removed = set->state.u->current_generation; | 1338 | ee->generation_removed = set_state->current_generation; |
1317 | } | 1339 | } |
1318 | 1340 | ||
1319 | 1341 | ||
1320 | /** | 1342 | /** |
1321 | * Dispatch messages for a union operation. | 1343 | * Dispatch messages for a union operation. |
1322 | * | 1344 | * |
1323 | * @param cls closure | 1345 | * @param eo the state of the union evaluate operation |
1324 | * @param tunnel mesh tunnel | 1346 | * @param mh the received message |
1325 | * @param tunnel_ctx tunnel context | ||
1326 | * @param mh message to process | ||
1327 | * @return GNUNET_SYSERR if the tunnel should be disconnected, | 1347 | * @return GNUNET_SYSERR if the tunnel should be disconnected, |
1328 | * GNUNET_OK otherwise | 1348 | * GNUNET_OK otherwise |
1329 | */ | 1349 | */ |
1330 | int | 1350 | int |
1331 | _GSS_union_handle_p2p_message (void *cls, | 1351 | union_handle_p2p_message (struct OperationState *eo, |
1332 | struct GNUNET_MESH_Tunnel *tunnel, | 1352 | const struct GNUNET_MessageHeader *mh) |
1333 | void **tunnel_ctx, | ||
1334 | const struct GNUNET_MessageHeader *mh) | ||
1335 | { | 1353 | { |
1336 | struct TunnelContext *tc = *tunnel_ctx; | ||
1337 | struct UnionEvaluateOperation *eo; | ||
1338 | |||
1339 | if (CONTEXT_OPERATION_UNION != tc->type) | ||
1340 | { | ||
1341 | return GNUNET_SYSERR; | ||
1342 | } | ||
1343 | |||
1344 | eo = tc->data.union_op; | ||
1345 | |||
1346 | switch (ntohs (mh->type)) | 1354 | switch (ntohs (mh->type)) |
1347 | { | 1355 | { |
1348 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: | 1356 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: |
@@ -1366,3 +1374,60 @@ _GSS_union_handle_p2p_message (void *cls, | |||
1366 | } | 1374 | } |
1367 | return GNUNET_OK; | 1375 | return GNUNET_OK; |
1368 | } | 1376 | } |
1377 | |||
1378 | |||
1379 | static void | ||
1380 | union_peer_disconnect (struct OperationState *op) | ||
1381 | { | ||
1382 | /* Are we already disconnected? */ | ||
1383 | if (NULL == op->tunnel) | ||
1384 | return; | ||
1385 | op->tunnel = NULL; | ||
1386 | if (NULL != op->mq) | ||
1387 | { | ||
1388 | GNUNET_MQ_destroy (op->mq); | ||
1389 | op->mq = NULL; | ||
1390 | } | ||
1391 | if (PHASE_FINISHED != op->phase) | ||
1392 | { | ||
1393 | struct GNUNET_MQ_Envelope *ev; | ||
1394 | struct GNUNET_SET_ResultMessage *msg; | ||
1395 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
1396 | msg->request_id = htonl (op->spec->client_request_id); | ||
1397 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | ||
1398 | msg->element_type = htons (0); | ||
1399 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
1400 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); | ||
1401 | } | ||
1402 | else | ||
1403 | { | ||
1404 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); | ||
1405 | } | ||
1406 | union_operation_destroy (op); | ||
1407 | } | ||
1408 | |||
1409 | |||
1410 | static void | ||
1411 | union_op_cancel (struct SetState *set_state, uint32_t op_id) | ||
1412 | { | ||
1413 | /* FIXME: implement */ | ||
1414 | } | ||
1415 | |||
1416 | |||
1417 | const struct SetVT * | ||
1418 | _GSS_union_vt (void) | ||
1419 | { | ||
1420 | static const struct SetVT union_vt = { | ||
1421 | .create = union_set_create, | ||
1422 | .msg_handler = union_handle_p2p_message, | ||
1423 | .add = union_add, | ||
1424 | .remove = union_remove, | ||
1425 | .destroy_set = union_set_destroy, | ||
1426 | .evaluate = union_evaluate, | ||
1427 | .accept = union_accept, | ||
1428 | .peer_disconnect = union_peer_disconnect, | ||
1429 | .cancel = union_op_cancel | ||
1430 | }; | ||
1431 | |||
1432 | return &union_vt; | ||
1433 | } | ||
diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c index 814c9e436..b0c81feb1 100644 --- a/src/set/gnunet-set-profiler.c +++ b/src/set/gnunet-set-profiler.c | |||
@@ -16,7 +16,7 @@ | |||
16 | along with GNUnet; see the file COPYING. If not, write to the | 16 | along with GNUnet; see the file COPYING. If not, write to the |
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | 17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file set/gnunet-set-profiler.c | 22 | * @file set/gnunet-set-profiler.c |
@@ -42,25 +42,17 @@ static char *op_str = "union"; | |||
42 | 42 | ||
43 | const static struct GNUNET_CONFIGURATION_Handle *config; | 43 | const static struct GNUNET_CONFIGURATION_Handle *config; |
44 | 44 | ||
45 | struct GNUNET_CONTAINER_MultiHashMap *map_a; | 45 | struct SetInfo |
46 | struct GNUNET_CONTAINER_MultiHashMap *map_b; | 46 | { |
47 | struct GNUNET_CONTAINER_MultiHashMap *map_c; | 47 | char *id; |
48 | 48 | struct GNUNET_SET_Handle *set; | |
49 | 49 | struct GNUNET_SET_OperationHandle *oh; | |
50 | /** | 50 | struct GNUNET_CONTAINER_MultiHashMap *sent; |
51 | * Elements that set a received, should match map_c | 51 | struct GNUNET_CONTAINER_MultiHashMap *received; |
52 | * in the end. | 52 | int done; |
53 | */ | 53 | } info1, info2; |
54 | struct GNUNET_CONTAINER_MultiHashMap *map_a_received; | ||
55 | |||
56 | /** | ||
57 | * Elements that set b received, should match map_c | ||
58 | * in the end. | ||
59 | */ | ||
60 | struct GNUNET_CONTAINER_MultiHashMap *map_b_received; | ||
61 | 54 | ||
62 | struct GNUNET_SET_Handle *set_a; | 55 | struct GNUNET_CONTAINER_MultiHashMap *common_sent; |
63 | struct GNUNET_SET_Handle *set_b; | ||
64 | 56 | ||
65 | struct GNUNET_HashCode app_id; | 57 | struct GNUNET_HashCode app_id; |
66 | 58 | ||
@@ -68,14 +60,6 @@ struct GNUNET_PeerIdentity local_peer; | |||
68 | 60 | ||
69 | struct GNUNET_SET_ListenHandle *set_listener; | 61 | struct GNUNET_SET_ListenHandle *set_listener; |
70 | 62 | ||
71 | struct GNUNET_SET_OperationHandle *set_oh1; | ||
72 | struct GNUNET_SET_OperationHandle *set_oh2; | ||
73 | |||
74 | |||
75 | int a_done; | ||
76 | int b_done; | ||
77 | |||
78 | |||
79 | 63 | ||
80 | static int | 64 | static int |
81 | map_remove_iterator (void *cls, | 65 | map_remove_iterator (void *cls, |
@@ -85,66 +69,69 @@ map_remove_iterator (void *cls, | |||
85 | struct GNUNET_CONTAINER_MultiHashMap *m = cls; | 69 | struct GNUNET_CONTAINER_MultiHashMap *m = cls; |
86 | int ret; | 70 | int ret; |
87 | 71 | ||
72 | GNUNET_assert (NULL != key); | ||
73 | |||
88 | ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL); | 74 | ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL); |
89 | GNUNET_assert (GNUNET_OK == ret); | 75 | if (GNUNET_OK != ret) |
76 | printf ("spurious element\n"); | ||
90 | return GNUNET_YES; | 77 | return GNUNET_YES; |
91 | 78 | ||
92 | } | 79 | } |
93 | 80 | ||
94 | |||
95 | static void | 81 | static void |
96 | set_result_cb_1 (void *cls, | 82 | check_all_done (void) |
97 | const struct GNUNET_SET_Element *element, | ||
98 | enum GNUNET_SET_Status status) | ||
99 | { | 83 | { |
100 | GNUNET_assert (GNUNET_NO == a_done); | 84 | if (info1.done == GNUNET_NO || info2.done == GNUNET_NO) |
101 | GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode)); | 85 | return; |
102 | switch (status) | 86 | |
103 | { | 87 | GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator, info2.sent); |
104 | case GNUNET_SET_STATUS_DONE: | 88 | GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator, info1.sent); |
105 | case GNUNET_SET_STATUS_HALF_DONE: | 89 | |
106 | a_done = GNUNET_YES; | 90 | printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (info1.sent)); |
107 | GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_a_received); | 91 | printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (info2.sent)); |
108 | GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received)); | 92 | |
109 | return; | 93 | GNUNET_SCHEDULER_shutdown (); |
110 | case GNUNET_SET_STATUS_FAILURE: | ||
111 | GNUNET_assert (0); | ||
112 | return; | ||
113 | case GNUNET_SET_STATUS_OK: | ||
114 | break; | ||
115 | default: | ||
116 | GNUNET_assert (0); | ||
117 | } | ||
118 | GNUNET_CONTAINER_multihashmap_put (map_a_received, | ||
119 | element->data, NULL, | ||
120 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
121 | } | 94 | } |
122 | 95 | ||
123 | 96 | ||
124 | static void | 97 | static void |
125 | set_result_cb_2 (void *cls, | 98 | set_result_cb (void *cls, |
126 | const struct GNUNET_SET_Element *element, | 99 | const struct GNUNET_SET_Element *element, |
127 | enum GNUNET_SET_Status status) | 100 | enum GNUNET_SET_Status status) |
128 | { | 101 | { |
129 | GNUNET_assert (GNUNET_NO == b_done); | 102 | struct SetInfo *info = cls; |
130 | GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode)); | 103 | |
104 | GNUNET_assert (GNUNET_NO == info->done); | ||
131 | switch (status) | 105 | switch (status) |
132 | { | 106 | { |
133 | case GNUNET_SET_STATUS_DONE: | 107 | case GNUNET_SET_STATUS_DONE: |
134 | case GNUNET_SET_STATUS_HALF_DONE: | 108 | case GNUNET_SET_STATUS_HALF_DONE: |
135 | b_done = GNUNET_YES; | 109 | info->done = GNUNET_YES; |
136 | GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_b_received); | 110 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id); |
137 | GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received)); | 111 | check_all_done (); |
112 | info->oh = NULL; | ||
138 | return; | 113 | return; |
139 | case GNUNET_SET_STATUS_FAILURE: | 114 | case GNUNET_SET_STATUS_FAILURE: |
140 | GNUNET_assert (0); | 115 | info->oh = NULL; |
116 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n"); | ||
117 | GNUNET_SCHEDULER_shutdown (); | ||
141 | return; | 118 | return; |
142 | case GNUNET_SET_STATUS_OK: | 119 | case GNUNET_SET_STATUS_OK: |
143 | break; | 120 | break; |
144 | default: | 121 | default: |
145 | GNUNET_assert (0); | 122 | GNUNET_assert (0); |
146 | } | 123 | } |
147 | GNUNET_CONTAINER_multihashmap_put (map_b_received, | 124 | |
125 | if (element->size != sizeof (struct GNUNET_HashCode)) | ||
126 | { | ||
127 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u\n", element->size); | ||
128 | GNUNET_assert (0); | ||
129 | } | ||
130 | |||
131 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n", | ||
132 | info->id, GNUNET_h2s (element->data)); | ||
133 | GNUNET_assert (NULL != element->data); | ||
134 | GNUNET_CONTAINER_multihashmap_put (info->received, | ||
148 | element->data, NULL, | 135 | element->data, NULL, |
149 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | 136 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
150 | } | 137 | } |
@@ -156,11 +143,16 @@ set_listen_cb (void *cls, | |||
156 | const struct GNUNET_MessageHeader *context_msg, | 143 | const struct GNUNET_MessageHeader *context_msg, |
157 | struct GNUNET_SET_Request *request) | 144 | struct GNUNET_SET_Request *request) |
158 | { | 145 | { |
159 | GNUNET_assert (NULL == set_oh2); | 146 | if (NULL == request) |
147 | { | ||
148 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "listener failed\n"); | ||
149 | return; | ||
150 | } | ||
151 | GNUNET_assert (NULL == info2.oh); | ||
160 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set listen cb called\n"); | 152 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set listen cb called\n"); |
161 | set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, | 153 | info2.oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, |
162 | set_result_cb_2, NULL); | 154 | set_result_cb, &info2); |
163 | GNUNET_SET_commit (set_oh2, set_b); | 155 | GNUNET_SET_commit (info2.oh, info2.set); |
164 | } | 156 | } |
165 | 157 | ||
166 | 158 | ||
@@ -185,6 +177,37 @@ set_insert_iterator (void *cls, | |||
185 | 177 | ||
186 | 178 | ||
187 | static void | 179 | static void |
180 | handle_shutdown (void *cls, | ||
181 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
182 | { | ||
183 | if (NULL != set_listener) | ||
184 | { | ||
185 | GNUNET_SET_listen_cancel (set_listener); | ||
186 | set_listener = NULL; | ||
187 | } | ||
188 | if (NULL != info1.oh) | ||
189 | { | ||
190 | GNUNET_SET_operation_cancel (info1.oh); | ||
191 | info1.oh = NULL; | ||
192 | } | ||
193 | if (NULL != info2.oh) | ||
194 | { | ||
195 | GNUNET_SET_operation_cancel (info2.oh); | ||
196 | info2.oh = NULL; | ||
197 | } | ||
198 | if (NULL != info1.set) | ||
199 | { | ||
200 | GNUNET_SET_destroy (info1.set); | ||
201 | info1.set = NULL; | ||
202 | } | ||
203 | if (NULL != info2.set) | ||
204 | { | ||
205 | GNUNET_SET_destroy (info2.set); | ||
206 | info2.set = NULL; | ||
207 | } | ||
208 | } | ||
209 | |||
210 | static void | ||
188 | run (void *cls, char *const *args, const char *cfgfile, | 211 | run (void *cls, char *const *args, const char *cfgfile, |
189 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 212 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
190 | { | 213 | { |
@@ -195,63 +218,41 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
195 | 218 | ||
196 | if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer)) | 219 | if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer)) |
197 | { | 220 | { |
198 | GNUNET_assert (0); | 221 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); |
222 | ret = 0; | ||
199 | return; | 223 | return; |
200 | } | 224 | } |
225 | |||
226 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, handle_shutdown, NULL); | ||
227 | |||
228 | info1.id = "a"; | ||
229 | info2.id = "b"; | ||
201 | 230 | ||
202 | map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO); | 231 | info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO); |
203 | map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO); | 232 | info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO); |
204 | map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO); | 233 | common_sent = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO); |
234 | |||
235 | info1.received = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO); | ||
236 | info2.received = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO); | ||
205 | 237 | ||
206 | for (i = 0; i < num_a; i++) | 238 | for (i = 0; i < num_a; i++) |
207 | { | 239 | { |
208 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | 240 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); |
209 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) | 241 | GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL, |
210 | { | 242 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
211 | i--; | ||
212 | continue; | ||
213 | } | ||
214 | GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash, | ||
215 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
216 | } | 243 | } |
217 | 244 | ||
218 | for (i = 0; i < num_b; i++) | 245 | for (i = 0; i < num_b; i++) |
219 | { | 246 | { |
220 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | 247 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); |
221 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) | 248 | GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL, |
222 | { | ||
223 | i--; | ||
224 | continue; | ||
225 | } | ||
226 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash)) | ||
227 | { | ||
228 | i--; | ||
229 | continue; | ||
230 | } | ||
231 | GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL, | ||
232 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | 249 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
233 | } | 250 | } |
234 | 251 | ||
235 | for (i = 0; i < num_c; i++) | 252 | for (i = 0; i < num_c; i++) |
236 | { | 253 | { |
237 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | 254 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); |
238 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) | 255 | GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL, |
239 | { | ||
240 | i--; | ||
241 | continue; | ||
242 | } | ||
243 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash)) | ||
244 | { | ||
245 | i--; | ||
246 | continue; | ||
247 | } | ||
248 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash)) | ||
249 | { | ||
250 | i--; | ||
251 | continue; | ||
252 | } | ||
253 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | ||
254 | GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL, | ||
255 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | 256 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
256 | } | 257 | } |
257 | 258 | ||
@@ -259,20 +260,22 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
259 | app_id = hash; | 260 | app_id = hash; |
260 | 261 | ||
261 | /* FIXME: also implement intersection etc. */ | 262 | /* FIXME: also implement intersection etc. */ |
262 | set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); | 263 | info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); |
263 | set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); | 264 | info2.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); |
264 | 265 | ||
265 | GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a); | 266 | GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator, info1.set); |
266 | GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b); | 267 | GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator, info2.set); |
267 | GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a); | 268 | GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator, info1.set); |
268 | GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b); | 269 | GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator, info2.set); |
269 | 270 | ||
270 | set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | 271 | set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, |
271 | &app_id, set_listen_cb, NULL); | 272 | &app_id, set_listen_cb, NULL); |
272 | 273 | ||
273 | set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, GNUNET_SET_RESULT_ADDED, | 274 | info1.oh = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, GNUNET_SET_RESULT_ADDED, |
274 | set_result_cb_1, NULL); | 275 | set_result_cb, &info1); |
275 | GNUNET_SET_commit (set_oh1, set_a); | 276 | GNUNET_SET_commit (info1.oh, info1.set); |
277 | GNUNET_SET_destroy (info1.set); | ||
278 | info1.set = NULL; | ||
276 | } | 279 | } |
277 | 280 | ||
278 | 281 | ||
diff --git a/src/set/set.h b/src/set/set.h index 7ec3e6cb2..fc29e6696 100644 --- a/src/set/set.h +++ b/src/set/set.h | |||
@@ -194,6 +194,24 @@ struct GNUNET_SET_ElementMessage | |||
194 | }; | 194 | }; |
195 | 195 | ||
196 | 196 | ||
197 | /** | ||
198 | * Sent to the service by the client | ||
199 | * in order to cancel a set operation. | ||
200 | */ | ||
201 | struct GNUNET_SET_CancelMessage | ||
202 | { | ||
203 | /** | ||
204 | * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL | ||
205 | */ | ||
206 | struct GNUNET_MessageHeader header; | ||
207 | |||
208 | /** | ||
209 | * ID of the request we want to cancel. | ||
210 | */ | ||
211 | uint32_t request_id GNUNET_PACKED; | ||
212 | }; | ||
213 | |||
214 | |||
197 | GNUNET_NETWORK_STRUCT_END | 215 | GNUNET_NETWORK_STRUCT_END |
198 | 216 | ||
199 | #endif | 217 | #endif |
diff --git a/src/set/set_api.c b/src/set/set_api.c index 370846bae..d3131648b 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -33,7 +33,6 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) |
35 | 35 | ||
36 | |||
37 | /** | 36 | /** |
38 | * Opaque handle to a set. | 37 | * Opaque handle to a set. |
39 | */ | 38 | */ |
@@ -42,20 +41,47 @@ struct GNUNET_SET_Handle | |||
42 | struct GNUNET_CLIENT_Connection *client; | 41 | struct GNUNET_CLIENT_Connection *client; |
43 | struct GNUNET_MQ_Handle *mq; | 42 | struct GNUNET_MQ_Handle *mq; |
44 | unsigned int messages_since_ack; | 43 | unsigned int messages_since_ack; |
44 | struct GNUNET_SET_OperationHandle *ops_head; | ||
45 | struct GNUNET_SET_OperationHandle *ops_tail; | ||
46 | int destroy_requested; | ||
45 | }; | 47 | }; |
46 | 48 | ||
49 | |||
47 | /** | 50 | /** |
48 | * Opaque handle to a set operation request from another peer. | 51 | * Opaque handle to a set operation request from another peer. |
49 | */ | 52 | */ |
50 | struct GNUNET_SET_Request | 53 | struct GNUNET_SET_Request |
51 | { | 54 | { |
55 | /** | ||
56 | * Id of the request, used to identify the request when | ||
57 | * accepting/rejecting it. | ||
58 | */ | ||
52 | uint32_t accept_id; | 59 | uint32_t accept_id; |
60 | |||
61 | /** | ||
62 | * Has the request been accepted already? | ||
63 | * GNUNET_YES/GNUNET_NO | ||
64 | */ | ||
53 | int accepted; | 65 | int accepted; |
54 | }; | 66 | }; |
55 | 67 | ||
68 | |||
69 | /** | ||
70 | * Handle to an operation. | ||
71 | * Only known to the service after commiting | ||
72 | * the handle with a set. | ||
73 | */ | ||
56 | struct GNUNET_SET_OperationHandle | 74 | struct GNUNET_SET_OperationHandle |
57 | { | 75 | { |
76 | /** | ||
77 | * Function to be called when we have a result, | ||
78 | * or an error. | ||
79 | */ | ||
58 | GNUNET_SET_ResultIterator result_cb; | 80 | GNUNET_SET_ResultIterator result_cb; |
81 | |||
82 | /** | ||
83 | * Closure for result_cb. | ||
84 | */ | ||
59 | void *result_cls; | 85 | void *result_cls; |
60 | 86 | ||
61 | /** | 87 | /** |
@@ -80,6 +106,17 @@ struct GNUNET_SET_OperationHandle | |||
80 | * used to patch the request id into the message when the set is known. | 106 | * used to patch the request id into the message when the set is known. |
81 | */ | 107 | */ |
82 | uint32_t *request_id_addr; | 108 | uint32_t *request_id_addr; |
109 | |||
110 | /** | ||
111 | * Handles are kept in a linked list. | ||
112 | */ | ||
113 | struct GNUNET_SET_OperationHandle *prev; | ||
114 | |||
115 | /** | ||
116 | * Handles are kept in a linked list. | ||
117 | */ | ||
118 | struct GNUNET_SET_OperationHandle *next; | ||
119 | |||
83 | }; | 120 | }; |
84 | 121 | ||
85 | 122 | ||
@@ -88,9 +125,25 @@ struct GNUNET_SET_OperationHandle | |||
88 | */ | 125 | */ |
89 | struct GNUNET_SET_ListenHandle | 126 | struct GNUNET_SET_ListenHandle |
90 | { | 127 | { |
128 | /** | ||
129 | * Connection to the service. | ||
130 | */ | ||
91 | struct GNUNET_CLIENT_Connection *client; | 131 | struct GNUNET_CLIENT_Connection *client; |
132 | |||
133 | /** | ||
134 | * Message queue for the client. | ||
135 | */ | ||
92 | struct GNUNET_MQ_Handle* mq; | 136 | struct GNUNET_MQ_Handle* mq; |
137 | |||
138 | /** | ||
139 | * Function to call on a new incoming request, | ||
140 | * or on error. | ||
141 | */ | ||
93 | GNUNET_SET_ListenCallback listen_cb; | 142 | GNUNET_SET_ListenCallback listen_cb; |
143 | |||
144 | /** | ||
145 | * Closure for listen_cb. | ||
146 | */ | ||
94 | void *listen_cls; | 147 | void *listen_cls; |
95 | }; | 148 | }; |
96 | 149 | ||
@@ -108,11 +161,13 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
108 | struct GNUNET_SET_Handle *set = cls; | 161 | struct GNUNET_SET_Handle *set = cls; |
109 | struct GNUNET_SET_OperationHandle *oh; | 162 | struct GNUNET_SET_OperationHandle *oh; |
110 | struct GNUNET_SET_Element e; | 163 | struct GNUNET_SET_Element e; |
111 | 164 | enum GNUNET_SET_Status result_status; | |
112 | 165 | ||
113 | GNUNET_assert (NULL != set); | 166 | GNUNET_assert (NULL != set); |
114 | GNUNET_assert (NULL != set->mq); | 167 | GNUNET_assert (NULL != set->mq); |
115 | 168 | ||
169 | result_status = ntohs (msg->result_status); | ||
170 | |||
116 | if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) | 171 | if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) |
117 | { | 172 | { |
118 | struct GNUNET_MQ_Envelope *mqm; | 173 | struct GNUNET_MQ_Envelope *mqm; |
@@ -123,11 +178,14 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
123 | GNUNET_assert (NULL != oh); | 178 | GNUNET_assert (NULL != oh); |
124 | /* status is not STATUS_OK => there's no attached element, | 179 | /* status is not STATUS_OK => there's no attached element, |
125 | * and this is the last result message we get */ | 180 | * and this is the last result message we get */ |
126 | if (htons (msg->result_status) != GNUNET_SET_STATUS_OK) | 181 | if (GNUNET_SET_STATUS_OK != result_status) |
127 | { | 182 | { |
128 | GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); | 183 | GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); |
184 | GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh); | ||
185 | if (GNUNET_YES == oh->set->destroy_requested) | ||
186 | GNUNET_SET_destroy (oh->set); | ||
129 | if (NULL != oh->result_cb) | 187 | if (NULL != oh->result_cb) |
130 | oh->result_cb (oh->result_cls, NULL, htons (msg->result_status)); | 188 | oh->result_cb (oh->result_cls, NULL, result_status); |
131 | GNUNET_free (oh); | 189 | GNUNET_free (oh); |
132 | return; | 190 | return; |
133 | } | 191 | } |
@@ -136,7 +194,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
136 | e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); | 194 | e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); |
137 | e.type = msg->element_type; | 195 | e.type = msg->element_type; |
138 | if (NULL != oh->result_cb) | 196 | if (NULL != oh->result_cb) |
139 | oh->result_cb (oh->result_cls, &e, htons (msg->result_status)); | 197 | oh->result_cb (oh->result_cls, &e, result_status); |
140 | } | 198 | } |
141 | 199 | ||
142 | /** | 200 | /** |
@@ -153,7 +211,7 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
153 | struct GNUNET_SET_Request *req; | 211 | struct GNUNET_SET_Request *req; |
154 | struct GNUNET_MessageHeader *context_msg; | 212 | struct GNUNET_MessageHeader *context_msg; |
155 | 213 | ||
156 | LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n"); | 214 | LOG (GNUNET_ERROR_TYPE_DEBUG, "processing operation request\n"); |
157 | req = GNUNET_new (struct GNUNET_SET_Request); | 215 | req = GNUNET_new (struct GNUNET_SET_Request); |
158 | req->accept_id = ntohl (msg->accept_id); | 216 | req->accept_id = ntohl (msg->accept_id); |
159 | context_msg = GNUNET_MQ_extract_nested_mh (msg); | 217 | context_msg = GNUNET_MQ_extract_nested_mh (msg); |
@@ -171,16 +229,42 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
171 | amsg->accept_reject_id = msg->accept_id; | 229 | amsg->accept_reject_id = msg->accept_id; |
172 | GNUNET_MQ_send (lh->mq, mqm); | 230 | GNUNET_MQ_send (lh->mq, mqm); |
173 | GNUNET_free (req); | 231 | GNUNET_free (req); |
174 | LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n"); | 232 | LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n"); |
175 | } | 233 | } |
176 | 234 | ||
177 | LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n"); | 235 | LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n"); |
178 | 236 | ||
179 | /* the accept-case is handled in GNUNET_SET_accept, | 237 | /* the accept-case is handled in GNUNET_SET_accept, |
180 | * as we have the accept message available there */ | 238 | * as we have the accept message available there */ |
181 | } | 239 | } |
182 | 240 | ||
183 | 241 | ||
242 | static void | ||
243 | handle_client_listener_error (void *cls, enum GNUNET_MQ_Error error) | ||
244 | { | ||
245 | struct GNUNET_SET_ListenHandle *lh = cls; | ||
246 | |||
247 | lh->listen_cb (lh->listen_cls, NULL, NULL, NULL); | ||
248 | } | ||
249 | |||
250 | |||
251 | static void | ||
252 | handle_client_set_error (void *cls, enum GNUNET_MQ_Error error) | ||
253 | { | ||
254 | struct GNUNET_SET_Handle *set = cls; | ||
255 | |||
256 | while (NULL != set->ops_head) | ||
257 | { | ||
258 | if (NULL != set->ops_head->result_cb) | ||
259 | set->ops_head->result_cb (set->ops_head->result_cls, NULL, | ||
260 | GNUNET_SET_STATUS_FAILURE); | ||
261 | GNUNET_SET_operation_cancel (set->ops_head); | ||
262 | } | ||
263 | |||
264 | /* FIXME: there should be a set error handler */ | ||
265 | } | ||
266 | |||
267 | |||
184 | /** | 268 | /** |
185 | * Create an empty set, supporting the specified operation. | 269 | * Create an empty set, supporting the specified operation. |
186 | * | 270 | * |
@@ -200,15 +284,16 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
200 | struct GNUNET_MQ_Envelope *mqm; | 284 | struct GNUNET_MQ_Envelope *mqm; |
201 | struct GNUNET_SET_CreateMessage *msg; | 285 | struct GNUNET_SET_CreateMessage *msg; |
202 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 286 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
203 | {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, | 287 | {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0}, |
204 | GNUNET_MQ_HANDLERS_END | 288 | GNUNET_MQ_HANDLERS_END |
205 | }; | 289 | }; |
206 | 290 | ||
207 | set = GNUNET_new (struct GNUNET_SET_Handle); | 291 | set = GNUNET_new (struct GNUNET_SET_Handle); |
208 | set->client = GNUNET_CLIENT_connect ("set", cfg); | 292 | set->client = GNUNET_CLIENT_connect ("set", cfg); |
209 | LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n"); | 293 | LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n"); |
210 | GNUNET_assert (NULL != set->client); | 294 | GNUNET_assert (NULL != set->client); |
211 | set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); | 295 | set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, |
296 | handle_client_set_error, set); | ||
212 | GNUNET_assert (NULL != set->mq); | 297 | GNUNET_assert (NULL != set->mq); |
213 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); | 298 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); |
214 | msg->operation = htons (op); | 299 | msg->operation = htons (op); |
@@ -279,6 +364,11 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set, | |||
279 | void | 364 | void |
280 | GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) | 365 | GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) |
281 | { | 366 | { |
367 | if (NULL != set->ops_head) | ||
368 | { | ||
369 | set->destroy_requested = GNUNET_YES; | ||
370 | return; | ||
371 | } | ||
282 | GNUNET_CLIENT_disconnect (set->client); | 372 | GNUNET_CLIENT_disconnect (set->client); |
283 | set->client = NULL; | 373 | set->client = NULL; |
284 | GNUNET_MQ_destroy (set->mq); | 374 | GNUNET_MQ_destroy (set->mq); |
@@ -332,7 +422,6 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, | |||
332 | return oh; | 422 | return oh; |
333 | } | 423 | } |
334 | 424 | ||
335 | |||
336 | /** | 425 | /** |
337 | * Wait for set operation requests for the given application id | 426 | * Wait for set operation requests for the given application id |
338 | * | 427 | * |
@@ -365,7 +454,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
365 | lh->listen_cb = listen_cb; | 454 | lh->listen_cb = listen_cb; |
366 | lh->listen_cls = listen_cls; | 455 | lh->listen_cls = listen_cls; |
367 | GNUNET_assert (NULL != lh->client); | 456 | GNUNET_assert (NULL != lh->client); |
368 | lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, lh); | 457 | lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, |
458 | handle_client_listener_error, lh); | ||
369 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); | 459 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); |
370 | msg->operation = htons (operation); | 460 | msg->operation = htons (operation); |
371 | msg->app_id = *app_id; | 461 | msg->app_id = *app_id; |
@@ -413,6 +503,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
413 | struct GNUNET_SET_OperationHandle *oh; | 503 | struct GNUNET_SET_OperationHandle *oh; |
414 | struct GNUNET_SET_AcceptRejectMessage *msg; | 504 | struct GNUNET_SET_AcceptRejectMessage *msg; |
415 | 505 | ||
506 | GNUNET_assert (NULL != request); | ||
416 | GNUNET_assert (GNUNET_NO == request->accepted); | 507 | GNUNET_assert (GNUNET_NO == request->accepted); |
417 | request->accepted = GNUNET_YES; | 508 | request->accepted = GNUNET_YES; |
418 | 509 | ||
@@ -432,6 +523,9 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
432 | 523 | ||
433 | /** | 524 | /** |
434 | * Cancel the given set operation. | 525 | * Cancel the given set operation. |
526 | * We need to send an explicit cancel message, as | ||
527 | * all operations communicate with the set's client | ||
528 | * handle. | ||
435 | * | 529 | * |
436 | * @param oh set operation to cancel | 530 | * @param oh set operation to cancel |
437 | */ | 531 | */ |
@@ -441,17 +535,20 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | |||
441 | struct GNUNET_MQ_Envelope *mqm; | 535 | struct GNUNET_MQ_Envelope *mqm; |
442 | struct GNUNET_SET_OperationHandle *h_assoc; | 536 | struct GNUNET_SET_OperationHandle *h_assoc; |
443 | 537 | ||
444 | if (NULL != oh->set) | 538 | GNUNET_assert (NULL != oh->set); |
445 | { | 539 | |
446 | h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); | 540 | GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh); |
447 | GNUNET_assert (h_assoc == oh); | 541 | h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); |
448 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); | 542 | GNUNET_assert (h_assoc == oh); |
449 | GNUNET_MQ_send (oh->set->mq, mqm); | 543 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); |
450 | } | 544 | GNUNET_MQ_send (oh->set->mq, mqm); |
451 | 545 | ||
452 | if (NULL != oh->conclude_mqm) | 546 | if (NULL != oh->conclude_mqm) |
453 | GNUNET_MQ_discard (oh->conclude_mqm); | 547 | GNUNET_MQ_discard (oh->conclude_mqm); |
454 | 548 | ||
549 | if (GNUNET_YES == oh->set->destroy_requested) | ||
550 | GNUNET_SET_destroy (oh->set); | ||
551 | |||
455 | GNUNET_free (oh); | 552 | GNUNET_free (oh); |
456 | } | 553 | } |
457 | 554 | ||
@@ -469,14 +566,15 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | |||
469 | */ | 566 | */ |
470 | void | 567 | void |
471 | GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, | 568 | GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, |
472 | struct GNUNET_SET_Handle *set) | 569 | struct GNUNET_SET_Handle *set) |
473 | { | 570 | { |
474 | GNUNET_assert (NULL == oh->set); | 571 | GNUNET_assert (NULL == oh->set); |
475 | GNUNET_assert (NULL != oh->conclude_mqm); | 572 | GNUNET_assert (NULL != oh->conclude_mqm); |
476 | oh->set = set; | 573 | oh->set = set; |
477 | oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh); | 574 | GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, oh); |
575 | oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh); | ||
478 | *oh->request_id_addr = htonl (oh->request_id); | 576 | *oh->request_id_addr = htonl (oh->request_id); |
479 | GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm); | 577 | GNUNET_MQ_send (set->mq, oh->conclude_mqm); |
480 | oh->conclude_mqm = NULL; | 578 | oh->conclude_mqm = NULL; |
481 | oh->request_id_addr = NULL; | 579 | oh->request_id_addr = NULL; |
482 | } | 580 | } |
diff --git a/src/util/mq.c b/src/util/mq.c index f318dd04a..04129a7b4 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -612,6 +612,7 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq, | |||
612 | struct GNUNET_MQ_Handle * | 612 | struct GNUNET_MQ_Handle * |
613 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, | 613 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, |
614 | const struct GNUNET_MQ_MessageHandler *handlers, | 614 | const struct GNUNET_MQ_MessageHandler *handlers, |
615 | GNUNET_MQ_ErrorHandler error_handler, | ||
615 | void *cls) | 616 | void *cls) |
616 | { | 617 | { |
617 | struct GNUNET_MQ_Handle *mq; | 618 | struct GNUNET_MQ_Handle *mq; |
@@ -621,6 +622,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti | |||
621 | 622 | ||
622 | mq = GNUNET_new (struct GNUNET_MQ_Handle); | 623 | mq = GNUNET_new (struct GNUNET_MQ_Handle); |
623 | mq->handlers = handlers; | 624 | mq->handlers = handlers; |
625 | mq->error_handler = error_handler; | ||
624 | mq->handlers_cls = cls; | 626 | mq->handlers_cls = cls; |
625 | state = GNUNET_new (struct ClientConnectionState); | 627 | state = GNUNET_new (struct ClientConnectionState); |
626 | state->connection = connection; | 628 | state->connection = connection; |
@@ -708,18 +710,29 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm, | |||
708 | void | 710 | void |
709 | GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) | 711 | GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) |
710 | { | 712 | { |
711 | /* FIXME: destroy all pending messages in the queue */ | ||
712 | |||
713 | if (NULL != mq->destroy_impl) | 713 | if (NULL != mq->destroy_impl) |
714 | { | 714 | { |
715 | mq->destroy_impl (mq, mq->impl_state); | 715 | mq->destroy_impl (mq, mq->impl_state); |
716 | } | 716 | } |
717 | 717 | ||
718 | while (NULL != mq->envelope_head) | ||
719 | { | ||
720 | struct GNUNET_MQ_Envelope *ev; | ||
721 | ev = mq->envelope_head; | ||
722 | GNUNET_MQ_discard (ev); | ||
723 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); | ||
724 | } | ||
725 | |||
726 | if (NULL != mq->current_envelope) | ||
727 | { | ||
728 | GNUNET_MQ_discard (mq->current_envelope); | ||
729 | mq->current_envelope = NULL; | ||
730 | } | ||
731 | |||
718 | GNUNET_free (mq); | 732 | GNUNET_free (mq); |
719 | } | 733 | } |
720 | 734 | ||
721 | 735 | ||
722 | |||
723 | struct GNUNET_MessageHeader * | 736 | struct GNUNET_MessageHeader * |
724 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) | 737 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) |
725 | { | 738 | { |