diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-11-05 00:08:13 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-11-05 00:08:13 +0000 |
commit | ca2c7bdfa64a30c0013598f0718dcfe7e6d98b2a (patch) | |
tree | 3bedd0e18f88371c2e75bd1953e0bc321629c828 | |
parent | 6c3bf6b3486fd31402ab991f5ddef76bf9cd93c4 (diff) | |
download | gnunet-ca2c7bdfa64a30c0013598f0718dcfe7e6d98b2a.tar.gz gnunet-ca2c7bdfa64a30c0013598f0718dcfe7e6d98b2a.zip |
- implemented missing set functionality
- secretsharing api changes
-rw-r--r-- | src/include/gnunet_container_lib.h | 51 | ||||
-rw-r--r-- | src/include/gnunet_secretsharing_service.h | 98 | ||||
-rw-r--r-- | src/set/Makefile.am | 11 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 387 | ||||
-rw-r--r-- | src/set/gnunet-service-set.h | 99 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 690 | ||||
-rw-r--r-- | src/set/ibf.h | 14 | ||||
-rw-r--r-- | src/set/strata_estimator.h | 49 | ||||
-rw-r--r-- | src/set/test_set.conf | 1 | ||||
-rw-r--r-- | src/set/test_set_union_result_full.c | 255 | ||||
-rw-r--r-- | src/util/container_multihashmap32.c | 121 | ||||
-rw-r--r-- | src/util/mq.c | 1 |
12 files changed, 1258 insertions, 519 deletions
diff --git a/src/include/gnunet_container_lib.h b/src/include/gnunet_container_lib.h index 9b7f8ecd1..6e782871a 100644 --- a/src/include/gnunet_container_lib.h +++ b/src/include/gnunet_container_lib.h | |||
@@ -1082,6 +1082,14 @@ struct GNUNET_CONTAINER_MultiHashMap32; | |||
1082 | 1082 | ||
1083 | /** | 1083 | /** |
1084 | * @ingroup hashmap | 1084 | * @ingroup hashmap |
1085 | * Opaque handle to an iterator over | ||
1086 | * a 32-bit key multihashmap. | ||
1087 | */ | ||
1088 | struct GNUNET_CONTAINER_MultiHashMap32Iterator; | ||
1089 | |||
1090 | |||
1091 | /** | ||
1092 | * @ingroup hashmap | ||
1085 | * Iterator over hash map entries. | 1093 | * Iterator over hash map entries. |
1086 | * | 1094 | * |
1087 | * @param cls closure | 1095 | * @param cls closure |
@@ -1267,6 +1275,49 @@ GNUNET_CONTAINER_multihashmap32_get_multiple (const struct GNUNET_CONTAINER_Mult | |||
1267 | void *it_cls); | 1275 | void *it_cls); |
1268 | 1276 | ||
1269 | 1277 | ||
1278 | /** | ||
1279 | * Create an iterator for a 32-bit multihashmap. | ||
1280 | * The iterator can be used to retrieve all the elements in the multihashmap | ||
1281 | * one by one, without having to handle all elements at once (in contrast to | ||
1282 | * #GNUNET_CONTAINER_multihashmap32_iterate). Note that the iterator can not be | ||
1283 | * used anymore if elements have been removed from 'map' after the creation of | ||
1284 | * the iterator, or 'map' has been destroyed. Adding elements to 'map' may | ||
1285 | * result in skipped or repeated elements. | ||
1286 | * | ||
1287 | * @param map the map to create an iterator for | ||
1288 | * @return an iterator over the given multihashmap map | ||
1289 | */ | ||
1290 | struct GNUNET_CONTAINER_MultiHashMap32Iterator * | ||
1291 | GNUNET_CONTAINER_multihashmap32_iterator_create (const struct GNUNET_CONTAINER_MultiHashMap32 *map); | ||
1292 | |||
1293 | |||
1294 | /** | ||
1295 | * Retrieve the next element from the hash map at the iterator's position. | ||
1296 | * If there are no elements left, GNUNET_NO is returned, and 'key' and 'value' | ||
1297 | * are not modified. | ||
1298 | * This operation is only allowed if no elements have been removed from the | ||
1299 | * multihashmap since the creation of 'iter', and the map has not been destroyed. | ||
1300 | * Adding elements may result in repeating or skipping elements. | ||
1301 | * | ||
1302 | * @param iter the iterator to get the next element from | ||
1303 | * @param key pointer to store the key in, can be NULL | ||
1304 | * @param value pointer to store the value in, can be NULL | ||
1305 | * @return #GNUNET_YES we returned an element, | ||
1306 | * #GNUNET_NO if we are out of elements | ||
1307 | */ | ||
1308 | int | ||
1309 | GNUNET_CONTAINER_multihashmap32_iterator_next (struct GNUNET_CONTAINER_MultiHashMap32Iterator *iter, | ||
1310 | uint32_t *key, | ||
1311 | const void **value); | ||
1312 | |||
1313 | |||
1314 | /** | ||
1315 | * Destroy a 32-bit multihashmap iterator. | ||
1316 | * | ||
1317 | * @param iter the iterator to destroy | ||
1318 | */ | ||
1319 | void | ||
1320 | GNUNET_CONTAINER_multihashmap32_iterator_destroy (struct GNUNET_CONTAINER_MultiHashMapIterator *iter); | ||
1270 | 1321 | ||
1271 | 1322 | ||
1272 | /* ******************** doubly-linked list *************** */ | 1323 | /* ******************** doubly-linked list *************** */ |
diff --git a/src/include/gnunet_secretsharing_service.h b/src/include/gnunet_secretsharing_service.h index d7bc46135..9f0df55af 100644 --- a/src/include/gnunet_secretsharing_service.h +++ b/src/include/gnunet_secretsharing_service.h | |||
@@ -48,6 +48,11 @@ extern "C" | |||
48 | */ | 48 | */ |
49 | struct GNUNET_SECRETSHARING_Session; | 49 | struct GNUNET_SECRETSHARING_Session; |
50 | 50 | ||
51 | /** | ||
52 | * Share of a secret shared with a group of peers. | ||
53 | */ | ||
54 | struct GNUNET_SECRETSHARING_Share; | ||
55 | |||
51 | 56 | ||
52 | /** | 57 | /** |
53 | * Handle to cancel a cooperative decryption operation. | 58 | * Handle to cancel a cooperative decryption operation. |
@@ -56,22 +61,14 @@ struct GNUNET_SECRETSHARING_DecryptionHandle; | |||
56 | 61 | ||
57 | 62 | ||
58 | /** | 63 | /** |
59 | * Parameters of the crypto system. | 64 | * Public key of a group sharing a secret. |
60 | */ | 65 | */ |
61 | struct GNUNET_SECRETSHARING_Parameters | 66 | struct GNUNET_SECRETSHARING_PublicKey |
62 | { | 67 | { |
63 | /** | 68 | /** |
64 | * Prime with p = 2q+1. | 69 | * Value of the private key. |
65 | */ | ||
66 | gcry_mpi_t p; | ||
67 | /** | ||
68 | * Prime. | ||
69 | */ | 70 | */ |
70 | gcry_mpi_t q; | 71 | gcry_mpi_t value; |
71 | /** | ||
72 | * Generator of G_q. | ||
73 | */ | ||
74 | gcry_mpi_t g; | ||
75 | }; | 72 | }; |
76 | 73 | ||
77 | 74 | ||
@@ -92,20 +89,35 @@ struct GNUNET_SECRETSHARING_Ciphertext | |||
92 | 89 | ||
93 | 90 | ||
94 | /** | 91 | /** |
92 | * Plain, unencrypted message that can be encrypted with | ||
93 | * a group public key. | ||
94 | */ | ||
95 | struct GNUNET_SECRETSHARING_Message | ||
96 | { | ||
97 | /** | ||
98 | * Value of the message. | ||
99 | */ | ||
100 | gcry_mpi_t value; | ||
101 | }; | ||
102 | |||
103 | |||
104 | /** | ||
95 | * Called once the secret has been established with all peers, or the deadline is due. | 105 | * Called once the secret has been established with all peers, or the deadline is due. |
96 | * | 106 | * |
97 | * Note that the number of peers can be smaller that 'k' (this threshold parameter), which | 107 | * Note that the number of peers can be smaller that 'k' (this threshold parameter), which |
98 | * makes the threshold crypto system useledd. However, in this case one can still determine which peers | 108 | * makes the threshold crypto system useless. However, in this case one can still determine which peers |
99 | * were able to participate in the secret sharing successfully. | 109 | * were able to participate in the secret sharing successfully. |
100 | * | 110 | * |
101 | * @param cls closure | 111 | * @param cls closure |
112 | * @param my_share the share of this peer | ||
102 | * @param public_key public key of the session | 113 | * @param public_key public key of the session |
103 | * @param num_ready_peers number of peers in @ready_peers | 114 | * @param num_ready_peers number of peers in ready_peers |
104 | * @parem ready_peers peers that successfuly participated in establishing | 115 | * @param ready_peers peers that successfuly participated in establishing |
105 | * the shared secret | 116 | * the shared secret |
106 | */ | 117 | */ |
107 | typedef void (*GNUNET_SECRETSHARING_SecretReadyCallback) (void *cls, | 118 | typedef void (*GNUNET_SECRETSHARING_SecretReadyCallback) (void *cls, |
108 | gcry_mpi_t public_key, | 119 | const struct GNUNET_SECRETSHARING_Share *my_share, |
120 | const struct GNUNET_SECRETSHARING_PublicKey public_key, | ||
109 | unsigned int num_ready_peers, | 121 | unsigned int num_ready_peers, |
110 | const struct GNUNET_PeerIdentity *ready_peers); | 122 | const struct GNUNET_PeerIdentity *ready_peers); |
111 | 123 | ||
@@ -114,10 +126,10 @@ typedef void (*GNUNET_SECRETSHARING_SecretReadyCallback) (void *cls, | |||
114 | * Called when a decryption has succeeded. | 126 | * Called when a decryption has succeeded. |
115 | * | 127 | * |
116 | * @param cls closure | 128 | * @param cls closure |
117 | * @param result decrypted value | 129 | * @param result decrypted value, must be free'd by the callback eventually |
118 | */ | 130 | */ |
119 | typedef void (*GNUNET_SECRETSHARING_DecryptCallback) (void *cls, | 131 | typedef void (*GNUNET_SECRETSHARING_DecryptCallback) (void *cls, |
120 | gcry_mpi_t result); | 132 | struct GNUNET_SECRETSHARING_Message *result); |
121 | 133 | ||
122 | 134 | ||
123 | /** | 135 | /** |
@@ -125,11 +137,11 @@ typedef void (*GNUNET_SECRETSHARING_DecryptCallback) (void *cls, | |||
125 | * with the other peers. | 137 | * with the other peers. |
126 | * | 138 | * |
127 | * @param cfg configuration to use | 139 | * @param cfg configuration to use |
128 | * @param num_peers number of peers in @peers | 140 | * @param num_peers number of peers in 'peers' |
141 | * @param peers array of peers that we will share secrets with, can optionally contain the local peer | ||
129 | * @param session_id unique session id | 142 | * @param session_id unique session id |
130 | * @param deadline point in time where the session must be established; taken as hint | 143 | * @param deadline point in time where the session must be established; taken as hint |
131 | * by underlying consensus sessions | 144 | * by underlying consensus sessions |
132 | * @param parameters parameters for the crypto system | ||
133 | * @param threshold minimum number of peers that must cooperate to decrypt a value | 145 | * @param threshold minimum number of peers that must cooperate to decrypt a value |
134 | * @param cb called when the secret has been established | 146 | * @param cb called when the secret has been established |
135 | * @param cls closure for cb | 147 | * @param cls closure for cb |
@@ -140,13 +152,51 @@ GNUNET_SECRETSHARING_create_session (const struct GNUNET_CONFIGURATION_Handle *c | |||
140 | const struct GNUNET_PeerIdentity *peers, | 152 | const struct GNUNET_PeerIdentity *peers, |
141 | const struct GNUNET_HashCode *session_id, | 153 | const struct GNUNET_HashCode *session_id, |
142 | struct GNUNET_TIME_Absolute deadline, | 154 | struct GNUNET_TIME_Absolute deadline, |
143 | struct GNUNET_SECRETSHARING_Parameters *parameters, | ||
144 | unsigned int threshold, | 155 | unsigned int threshold, |
145 | GNUNET_SECRETSHARING_SecretReadyCallback *cb, | 156 | GNUNET_SECRETSHARING_SecretReadyCallback *cb, |
146 | void *cls); | 157 | void *cls); |
147 | 158 | ||
148 | 159 | ||
149 | /** | 160 | /** |
161 | * Load a session from an existing share. | ||
162 | * | ||
163 | * @param cfg configuration to use for connecting to the secretsharing service | ||
164 | * @param share share to load the session from | ||
165 | */ | ||
166 | struct GNUNET_SECRETSHARING_Session * | ||
167 | GNUNET_SECRETSHARING_load_session (const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
168 | const struct GNUNET_SECRETSHARING_Share *share); | ||
169 | |||
170 | /** | ||
171 | * Convert a secret share to a string. | ||
172 | * | ||
173 | * @param share share to serialize | ||
174 | * @return the serialized secret share, to be freed by the caller | ||
175 | */ | ||
176 | char * | ||
177 | GNUNET_SECRETSHARING_share_to_string (const struct GNUNET_SECRETSHARING_Share *share); | ||
178 | |||
179 | |||
180 | /** | ||
181 | * Convert a secret share to a string. | ||
182 | * | ||
183 | * @param str string to deserialize | ||
184 | * @return the serialized secret share, to be freed by the caller | ||
185 | */ | ||
186 | const struct GNUNET_SECRETSHARING_Share * | ||
187 | GNUNET_SECRETSHARING_share_from_string (const char *str); | ||
188 | |||
189 | |||
190 | /** | ||
191 | * Destroy a secret share. | ||
192 | * | ||
193 | * @param share secret share to destroy | ||
194 | */ | ||
195 | void | ||
196 | GNUNET_SECRETSHARING_share_destroy (const struct GNUNET_SECRETSHARING_Share *share); | ||
197 | |||
198 | |||
199 | /** | ||
150 | * Destroy a secret sharing session. | 200 | * Destroy a secret sharing session. |
151 | * | 201 | * |
152 | * @param session session to destroy | 202 | * @param session session to destroy |
@@ -165,12 +215,12 @@ GNUNET_SECRETSHARING_destroy_session (struct GNUNET_SECRETSHARING_Session *sessi | |||
165 | * @param session session to take the key for encryption from, | 215 | * @param session session to take the key for encryption from, |
166 | * the session's ready callback must have been already called | 216 | * the session's ready callback must have been already called |
167 | * @param message message to encrypt | 217 | * @param message message to encrypt |
168 | * @param result_cyphertext pointer to store the resulting ciphertext | 218 | * @param result_ciphertext pointer to store the resulting ciphertext |
169 | * @return GNUNET_YES on succes, GNUNET_SYSERR if the message is invalid (invalid range) | 219 | * @return GNUNET_YES on succes, GNUNET_SYSERR if the message is invalid (invalid range) |
170 | */ | 220 | */ |
171 | int | 221 | int |
172 | GNUNET_SECRETSHARING_encrypt (const struct GNUNET_SECRETSHARING_Session *session, | 222 | GNUNET_SECRETSHARING_encrypt (const struct GNUNET_SECRETSHARING_Session *session, |
173 | gcry_mpi_t message, | 223 | const struct GNUNET_SECRETSHARING_Message *message, |
174 | struct GNUNET_SECRETSHARING_Ciphertext *result_ciphertext); | 224 | struct GNUNET_SECRETSHARING_Ciphertext *result_ciphertext); |
175 | 225 | ||
176 | 226 | ||
@@ -206,6 +256,8 @@ void | |||
206 | GNUNET_SECRETSHARING_cancel_decrypt (struct GNUNET_SECRETSHARING_DecryptionHandle *decryption_handle); | 256 | GNUNET_SECRETSHARING_cancel_decrypt (struct GNUNET_SECRETSHARING_DecryptionHandle *decryption_handle); |
207 | 257 | ||
208 | 258 | ||
259 | |||
260 | |||
209 | #if 0 /* keep Emacsens' auto-indent happy */ | 261 | #if 0 /* keep Emacsens' auto-indent happy */ |
210 | { | 262 | { |
211 | #endif | 263 | #endif |
diff --git a/src/set/Makefile.am b/src/set/Makefile.am index 878ff0cbd..72d3d82a0 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am | |||
@@ -63,7 +63,7 @@ libgnunetset_la_LDFLAGS = \ | |||
63 | 63 | ||
64 | if HAVE_TESTING | 64 | if HAVE_TESTING |
65 | check_PROGRAMS = \ | 65 | check_PROGRAMS = \ |
66 | test_set_api | 66 | test_set_api test_set_union_result_full |
67 | endif | 67 | endif |
68 | 68 | ||
69 | if ENABLE_TEST_RUN | 69 | if ENABLE_TEST_RUN |
@@ -79,6 +79,15 @@ test_set_api_LDADD = \ | |||
79 | test_set_api_DEPENDENCIES = \ | 79 | test_set_api_DEPENDENCIES = \ |
80 | libgnunetset.la | 80 | libgnunetset.la |
81 | 81 | ||
82 | test_set_union_result_full_SOURCES = \ | ||
83 | test_set_union_result_full.c | ||
84 | test_set_union_result_full_LDADD = \ | ||
85 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
86 | $(top_builddir)/src/testing/libgnunettesting.la \ | ||
87 | $(top_builddir)/src/set/libgnunetset.la | ||
88 | test_set_union_result_full_DEPENDENCIES = \ | ||
89 | libgnunetset.la | ||
90 | |||
82 | EXTRA_DIST = \ | 91 | EXTRA_DIST = \ |
83 | test_set.conf | 92 | test_set.conf |
84 | 93 | ||
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 2e951c3f2..7eb3fdb30 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -28,41 +28,20 @@ | |||
28 | 28 | ||
29 | 29 | ||
30 | /** | 30 | /** |
31 | * Peer that has connected to us, but is not yet evaluating a set operation. | 31 | * State of an operation where the peer has connected to us, but is not yet |
32 | * Once the peer has sent a request, and the client has | 32 | * evaluating a set operation. Once the peer has sent a concrete request, and |
33 | * accepted or rejected it, this information will be deleted. | 33 | * the client has accepted or rejected it, this information will be deleted |
34 | * and replaced by the real set operation state. | ||
34 | */ | 35 | */ |
35 | struct Incoming | 36 | struct OperationState |
36 | { | 37 | { |
37 | /** | 38 | /** |
38 | * Incoming peers are held in a linked list | ||
39 | */ | ||
40 | struct Incoming *next; | ||
41 | |||
42 | /** | ||
43 | * Incoming peers are held in a linked list | ||
44 | */ | ||
45 | struct Incoming *prev; | ||
46 | |||
47 | /** | ||
48 | * Detail information about the operation. | ||
49 | * NULL as long as we did not receive the operation | ||
50 | * request from the remote peer. | ||
51 | */ | ||
52 | struct OperationSpecification *spec; | ||
53 | |||
54 | /** | ||
55 | * The identity of the requesting peer. Needs to | 39 | * The identity of the requesting peer. Needs to |
56 | * be stored here as the op spec might not have been created yet. | 40 | * be stored here as the op spec might not have been created yet. |
57 | */ | 41 | */ |
58 | struct GNUNET_PeerIdentity peer; | 42 | struct GNUNET_PeerIdentity peer; |
59 | 43 | ||
60 | /** | 44 | /** |
61 | * Tunnel to the peer. | ||
62 | */ | ||
63 | struct GNUNET_MESH_Tunnel *tunnel; | ||
64 | |||
65 | /** | ||
66 | * Unique request id for the request from | 45 | * Unique request id for the request from |
67 | * a remote peer, sent to the client, which will | 46 | * a remote peer, sent to the client, which will |
68 | * accept or reject the request. | 47 | * accept or reject the request. |
@@ -76,12 +55,6 @@ struct Incoming | |||
76 | * after the timeout, it will be disconnected. | 55 | * after the timeout, it will be disconnected. |
77 | */ | 56 | */ |
78 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | 57 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; |
79 | |||
80 | /** | ||
81 | * Tunnel context, needs to be stored here as a client's accept will change | ||
82 | * the tunnel context. | ||
83 | */ | ||
84 | struct TunnelContext *tc; | ||
85 | }; | 58 | }; |
86 | 59 | ||
87 | 60 | ||
@@ -160,13 +133,13 @@ static struct Listener *listeners_tail; | |||
160 | * Incoming sockets from remote peers are | 133 | * Incoming sockets from remote peers are |
161 | * held in a doubly linked list. | 134 | * held in a doubly linked list. |
162 | */ | 135 | */ |
163 | static struct Incoming *incoming_head; | 136 | static struct Operation *incoming_head; |
164 | 137 | ||
165 | /** | 138 | /** |
166 | * Incoming sockets from remote peers are | 139 | * Incoming sockets from remote peers are |
167 | * held in a doubly linked list. | 140 | * held in a doubly linked list. |
168 | */ | 141 | */ |
169 | static struct Incoming *incoming_tail; | 142 | static struct Operation *incoming_tail; |
170 | 143 | ||
171 | /** | 144 | /** |
172 | * Counter for allocating unique IDs for clients, | 145 | * Counter for allocating unique IDs for clients, |
@@ -221,14 +194,14 @@ listener_get (struct GNUNET_SERVER_Client *client) | |||
221 | * @return the incoming socket associated with the id, | 194 | * @return the incoming socket associated with the id, |
222 | * or NULL if there is none | 195 | * or NULL if there is none |
223 | */ | 196 | */ |
224 | static struct Incoming * | 197 | static struct Operation * |
225 | get_incoming (uint32_t id) | 198 | get_incoming (uint32_t id) |
226 | { | 199 | { |
227 | struct Incoming *incoming; | 200 | struct Operation *op; |
228 | 201 | ||
229 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) | 202 | for (op = incoming_head; NULL != op; op = op) |
230 | if (incoming->suggest_id == id) | 203 | if (op->state->suggest_id == id) |
231 | return incoming; | 204 | return op; |
232 | return NULL; | 205 | return NULL; |
233 | } | 206 | } |
234 | 207 | ||
@@ -261,7 +234,8 @@ listener_destroy (struct Listener *listener) | |||
261 | 234 | ||
262 | 235 | ||
263 | /** | 236 | /** |
264 | * Iterator over hash map entries. | 237 | * Iterator over hash map entries to free |
238 | * element entries. | ||
265 | * | 239 | * |
266 | * @param cls closure | 240 | * @param cls closure |
267 | * @param key current key code | 241 | * @param key current key code |
@@ -283,6 +257,100 @@ destroy_elements_iterator (void *cls, | |||
283 | 257 | ||
284 | 258 | ||
285 | /** | 259 | /** |
260 | * Collect and destroy elements that are not needed anymore, because | ||
261 | * their lifetime (as determined by their generation) does not overlap with any active | ||
262 | * set operation. | ||
263 | */ | ||
264 | void | ||
265 | collect_generation_garbage (struct Set *set) | ||
266 | { | ||
267 | struct GNUNET_CONTAINER_MultiHashMapIterator *iter; | ||
268 | struct ElementEntry *ee; | ||
269 | struct GNUNET_CONTAINER_MultiHashMap *new_elements; | ||
270 | int res; | ||
271 | struct Operation *op; | ||
272 | |||
273 | new_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
274 | iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->elements); | ||
275 | while (GNUNET_OK == | ||
276 | (res = GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ee))) | ||
277 | { | ||
278 | if (GNUNET_NO == ee->removed) | ||
279 | goto still_needed; | ||
280 | for (op = set->ops_head; NULL != op; op = op->next) | ||
281 | if ( (op->generation_created >= ee->generation_added) && | ||
282 | (op->generation_created < ee->generation_removed) ) | ||
283 | goto still_needed; | ||
284 | GNUNET_free (ee); | ||
285 | continue; | ||
286 | still_needed: | ||
287 | // we don't expect collisions, thus the replace option | ||
288 | GNUNET_CONTAINER_multihashmap_put (new_elements, &ee->element_hash, ee, | ||
289 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
290 | } | ||
291 | GNUNET_CONTAINER_multihashmap_iterator_destroy (iter); | ||
292 | GNUNET_CONTAINER_multihashmap_destroy (set->elements); | ||
293 | set->elements = new_elements; | ||
294 | } | ||
295 | |||
296 | |||
297 | /** | ||
298 | * Destroy the given operation. Call the implementation-specific cancel function | ||
299 | * of the operation. Disconnects from the remote peer. | ||
300 | * Does not disconnect the client, as there may be multiple operations per set. | ||
301 | * | ||
302 | * @param op operation to destroy | ||
303 | */ | ||
304 | void | ||
305 | _GSS_operation_destroy (struct Operation *op) | ||
306 | { | ||
307 | struct Set *set; | ||
308 | |||
309 | if (NULL == op->vt) | ||
310 | return; | ||
311 | |||
312 | set = op->spec->set; | ||
313 | |||
314 | GNUNET_assert (GNUNET_NO == op->is_incoming); | ||
315 | GNUNET_assert (NULL != op->spec); | ||
316 | GNUNET_CONTAINER_DLL_remove (op->spec->set->ops_head, | ||
317 | op->spec->set->ops_tail, | ||
318 | op); | ||
319 | |||
320 | op->vt->cancel (op); | ||
321 | op->vt = NULL; | ||
322 | |||
323 | if (NULL != op->spec) | ||
324 | { | ||
325 | if (NULL != op->spec->context_msg) | ||
326 | { | ||
327 | GNUNET_free (op->spec->context_msg); | ||
328 | op->spec->context_msg = NULL; | ||
329 | } | ||
330 | GNUNET_free (op->spec); | ||
331 | op->spec = NULL; | ||
332 | } | ||
333 | |||
334 | if (NULL != op->mq) | ||
335 | { | ||
336 | GNUNET_MQ_destroy (op->mq); | ||
337 | op->mq = NULL; | ||
338 | } | ||
339 | |||
340 | if (NULL != op->tunnel) | ||
341 | { | ||
342 | GNUNET_MESH_tunnel_destroy (op->tunnel); | ||
343 | op->tunnel = NULL; | ||
344 | } | ||
345 | |||
346 | collect_generation_garbage (set); | ||
347 | |||
348 | /* We rely on the tunnel end handler to free 'op'. When 'op->tunnel' was NULL, | ||
349 | * there was a tunnel end handler that will free 'op' on the call stack. */ | ||
350 | } | ||
351 | |||
352 | |||
353 | /** | ||
286 | * Destroy a set, and free all resources associated with it. | 354 | * Destroy a set, and free all resources associated with it. |
287 | * | 355 | * |
288 | * @param set the set to destroy | 356 | * @param set the set to destroy |
@@ -302,6 +370,8 @@ set_destroy (struct Set *set) | |||
302 | return; | 370 | return; |
303 | } | 371 | } |
304 | GNUNET_assert (NULL != set->state); | 372 | GNUNET_assert (NULL != set->state); |
373 | while (NULL != set->ops_head) | ||
374 | _GSS_operation_destroy (set->ops_head); | ||
305 | set->vt->destroy_set (set->state); | 375 | set->vt->destroy_set (set->state); |
306 | set->state = NULL; | 376 | set->state = NULL; |
307 | if (NULL != set->client_mq) | 377 | if (NULL != set->client_mq) |
@@ -364,25 +434,40 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
364 | * @param incoming remote request to destroy | 434 | * @param incoming remote request to destroy |
365 | */ | 435 | */ |
366 | static void | 436 | static void |
367 | incoming_destroy (struct Incoming *incoming) | 437 | incoming_destroy (struct Operation *incoming) |
368 | { | 438 | { |
439 | GNUNET_assert (GNUNET_YES == incoming->is_incoming); | ||
369 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); | 440 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); |
370 | if (GNUNET_SCHEDULER_NO_TASK != incoming->timeout_task) | 441 | if (GNUNET_SCHEDULER_NO_TASK != incoming->state->timeout_task) |
371 | { | 442 | { |
372 | GNUNET_SCHEDULER_cancel (incoming->timeout_task); | 443 | GNUNET_SCHEDULER_cancel (incoming->state->timeout_task); |
373 | incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 444 | incoming->state->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
374 | } | 445 | } |
375 | if (NULL != incoming->tunnel) | 446 | GNUNET_free (incoming->state); |
376 | { | ||
377 | struct GNUNET_MESH_Tunnel *t = incoming->tunnel; | ||
378 | incoming->tunnel = NULL; | ||
379 | GNUNET_MESH_tunnel_destroy (t); | ||
380 | return; | ||
381 | } | ||
382 | GNUNET_free (incoming); | ||
383 | } | 447 | } |
384 | 448 | ||
385 | 449 | ||
450 | static void | ||
451 | incoming_retire (struct Operation *incoming) | ||
452 | { | ||
453 | GNUNET_assert (NULL != incoming->spec); | ||
454 | GNUNET_assert (GNUNET_YES == incoming->is_incoming); | ||
455 | incoming->is_incoming = GNUNET_NO; | ||
456 | GNUNET_free (incoming->state); | ||
457 | incoming->state = NULL; | ||
458 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); | ||
459 | } | ||
460 | |||
461 | |||
462 | /** | ||
463 | * Find a listener that is interested in the given operation type | ||
464 | * and application id. | ||
465 | * | ||
466 | * @param op operation type to look for | ||
467 | * @param app_id application id to look for | ||
468 | * @return a matching listener, or NULL if no listener matches the | ||
469 | * given operation and application id | ||
470 | */ | ||
386 | static struct Listener * | 471 | static struct Listener * |
387 | listener_get_by_target (enum GNUNET_SET_OperationType op, | 472 | listener_get_by_target (enum GNUNET_SET_OperationType op, |
388 | const struct GNUNET_HashCode *app_id) | 473 | const struct GNUNET_HashCode *app_id) |
@@ -409,23 +494,24 @@ listener_get_by_target (enum GNUNET_SET_OperationType op, | |||
409 | * @param listener the listener to suggest the request to | 494 | * @param listener the listener to suggest the request to |
410 | */ | 495 | */ |
411 | static void | 496 | static void |
412 | incoming_suggest (struct Incoming *incoming, struct Listener *listener) | 497 | incoming_suggest (struct Operation *incoming, struct Listener *listener) |
413 | { | 498 | { |
414 | struct GNUNET_MQ_Envelope *mqm; | 499 | struct GNUNET_MQ_Envelope *mqm; |
415 | struct GNUNET_SET_RequestMessage *cmsg; | 500 | struct GNUNET_SET_RequestMessage *cmsg; |
416 | 501 | ||
417 | GNUNET_assert (0 == incoming->suggest_id); | ||
418 | GNUNET_assert (NULL != incoming->spec); | 502 | GNUNET_assert (NULL != incoming->spec); |
419 | incoming->suggest_id = suggest_id++; | 503 | GNUNET_assert (0 == incoming->state->suggest_id); |
504 | incoming->state->suggest_id = suggest_id++; | ||
420 | 505 | ||
421 | GNUNET_SCHEDULER_cancel (incoming->timeout_task); | 506 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != incoming->state->timeout_task); |
422 | incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 507 | GNUNET_SCHEDULER_cancel (incoming->state->timeout_task); |
508 | incoming->state->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
423 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, | 509 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, |
424 | incoming->spec->context_msg); | 510 | incoming->spec->context_msg); |
425 | GNUNET_assert (NULL != mqm); | 511 | GNUNET_assert (NULL != mqm); |
426 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id %u\n", | 512 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id %u\n", |
427 | incoming->suggest_id); | 513 | incoming->state->suggest_id); |
428 | cmsg->accept_id = htonl (incoming->suggest_id); | 514 | cmsg->accept_id = htonl (incoming->state->suggest_id); |
429 | cmsg->peer_id = incoming->spec->peer; | 515 | cmsg->peer_id = incoming->spec->peer; |
430 | GNUNET_MQ_send (listener->client_mq, mqm); | 516 | GNUNET_MQ_send (listener->client_mq, mqm); |
431 | } | 517 | } |
@@ -441,10 +527,9 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener) | |||
441 | * GNUNET_SYSERR to destroy the tunnel | 527 | * GNUNET_SYSERR to destroy the tunnel |
442 | */ | 528 | */ |
443 | static int | 529 | static int |
444 | handle_incoming_msg (struct OperationState *op, | 530 | handle_incoming_msg (struct Operation *op, |
445 | const struct GNUNET_MessageHeader *mh) | 531 | const struct GNUNET_MessageHeader *mh) |
446 | { | 532 | { |
447 | struct Incoming *incoming = (struct Incoming *) op; | ||
448 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; | 533 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; |
449 | struct Listener *listener; | 534 | struct Listener *listener; |
450 | struct OperationSpecification *spec; | 535 | struct OperationSpecification *spec; |
@@ -457,7 +542,7 @@ handle_incoming_msg (struct OperationState *op, | |||
457 | return GNUNET_SYSERR; | 542 | return GNUNET_SYSERR; |
458 | } | 543 | } |
459 | 544 | ||
460 | if (NULL != incoming->spec) | 545 | if (NULL != op->spec) |
461 | { | 546 | { |
462 | /* double operation request */ | 547 | /* double operation request */ |
463 | GNUNET_break_op (0); | 548 | GNUNET_break_op (0); |
@@ -471,9 +556,9 @@ handle_incoming_msg (struct OperationState *op, | |||
471 | spec->operation = ntohl (msg->operation); | 556 | spec->operation = ntohl (msg->operation); |
472 | spec->app_id = msg->app_id; | 557 | spec->app_id = msg->app_id; |
473 | spec->salt = ntohl (msg->salt); | 558 | spec->salt = ntohl (msg->salt); |
474 | spec->peer = incoming->peer; | 559 | spec->peer = op->state->peer; |
475 | 560 | ||
476 | incoming->spec = spec; | 561 | op->spec = spec; |
477 | 562 | ||
478 | if ( (NULL != spec->context_msg) && | 563 | if ( (NULL != spec->context_msg) && |
479 | (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) | 564 | (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) |
@@ -491,11 +576,19 @@ handle_incoming_msg (struct OperationState *op, | |||
491 | "no listener matches incoming request, waiting with timeout\n"); | 576 | "no listener matches incoming request, waiting with timeout\n"); |
492 | return GNUNET_OK; | 577 | return GNUNET_OK; |
493 | } | 578 | } |
494 | incoming_suggest (incoming, listener); | 579 | incoming_suggest (op, listener); |
495 | return GNUNET_OK; | 580 | return GNUNET_OK; |
496 | } | 581 | } |
497 | 582 | ||
498 | 583 | ||
584 | /** | ||
585 | * Send the next element of a set to the set's client. The next element is given by | ||
586 | * the set's current hashmap iterator. The set's iterator will be set to NULL if there | ||
587 | * are no more elements in the set. The caller must ensure that the set's iterator is | ||
588 | * valid. | ||
589 | * | ||
590 | * @param set set that should send its next element to its client | ||
591 | */ | ||
499 | static void | 592 | static void |
500 | send_client_element (struct Set *set) | 593 | send_client_element (struct Set *set) |
501 | { | 594 | { |
@@ -508,6 +601,8 @@ send_client_element (struct Set *set) | |||
508 | if (GNUNET_NO == ret) | 601 | if (GNUNET_NO == ret) |
509 | { | 602 | { |
510 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); | 603 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); |
604 | GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); | ||
605 | set->iter = NULL; | ||
511 | } | 606 | } |
512 | else | 607 | else |
513 | { | 608 | { |
@@ -588,7 +683,8 @@ handle_client_create (void *cls, | |||
588 | switch (ntohs (msg->operation)) | 683 | switch (ntohs (msg->operation)) |
589 | { | 684 | { |
590 | case GNUNET_SET_OPERATION_INTERSECTION: | 685 | case GNUNET_SET_OPERATION_INTERSECTION: |
591 | // set->vt = _GSS_intersection_vt (); | 686 | // FIXME: implement intersection vt |
687 | // set->vt = _GSS_intersection_vt (); | ||
592 | break; | 688 | break; |
593 | case GNUNET_SET_OPERATION_UNION: | 689 | case GNUNET_SET_OPERATION_UNION: |
594 | set->vt = _GSS_union_vt (); | 690 | set->vt = _GSS_union_vt (); |
@@ -623,7 +719,7 @@ handle_client_listen (void *cls, | |||
623 | { | 719 | { |
624 | struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m; | 720 | struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m; |
625 | struct Listener *listener; | 721 | struct Listener *listener; |
626 | struct Incoming *incoming; | 722 | struct Operation *op; |
627 | 723 | ||
628 | if (NULL != listener_get (client)) | 724 | if (NULL != listener_get (client)) |
629 | { | 725 | { |
@@ -639,24 +735,26 @@ handle_client_listen (void *cls, | |||
639 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); | 735 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); |
640 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n", | 736 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n", |
641 | listener->operation, GNUNET_h2s (&listener->app_id)); | 737 | listener->operation, GNUNET_h2s (&listener->app_id)); |
642 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) | 738 | /* check for incoming requests the listener is interested in */ |
739 | for (op = incoming_head; NULL != op; op = op->next) | ||
643 | { | 740 | { |
644 | if (NULL == incoming->spec) | 741 | if (NULL == op->spec) |
645 | { | 742 | { |
646 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n"); | 743 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n"); |
647 | continue; | 744 | continue; |
648 | } | 745 | } |
649 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, suggest: %u)\n", | 746 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, suggest: %u)\n", |
650 | incoming->spec->operation, GNUNET_h2s (&incoming->spec->app_id), incoming->suggest_id); | 747 | op->spec->operation, GNUNET_h2s (&op->spec->app_id), op->state->suggest_id); |
651 | 748 | ||
652 | if (0 != incoming->suggest_id) | 749 | /* don't consider the incoming request if it has been already suggested to a listener */ |
750 | if (0 != op->state->suggest_id) | ||
653 | continue; | 751 | continue; |
654 | if (listener->operation != incoming->spec->operation) | 752 | if (listener->operation != op->spec->operation) |
655 | continue; | 753 | continue; |
656 | if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id)) | 754 | if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &op->spec->app_id)) |
657 | continue; | 755 | continue; |
658 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n"); | 756 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n"); |
659 | incoming_suggest (incoming, listener); | 757 | incoming_suggest (op, listener); |
660 | } | 758 | } |
661 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n"); | 759 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n"); |
662 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 760 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
@@ -676,8 +774,9 @@ handle_client_reject (void *cls, | |||
676 | struct GNUNET_SERVER_Client *client, | 774 | struct GNUNET_SERVER_Client *client, |
677 | const struct GNUNET_MessageHeader *m) | 775 | const struct GNUNET_MessageHeader *m) |
678 | { | 776 | { |
679 | struct Incoming *incoming; | 777 | struct Operation *incoming; |
680 | const struct GNUNET_SET_AcceptRejectMessage *msg; | 778 | const struct GNUNET_SET_AcceptRejectMessage *msg; |
779 | struct GNUNET_MESH_Tunnel *tunnel; | ||
681 | 780 | ||
682 | msg = (const struct GNUNET_SET_AcceptRejectMessage *) m; | 781 | msg = (const struct GNUNET_SET_AcceptRejectMessage *) m; |
683 | GNUNET_break (0 == ntohl (msg->request_id)); | 782 | GNUNET_break (0 == ntohl (msg->request_id)); |
@@ -689,13 +788,17 @@ handle_client_reject (void *cls, | |||
689 | return; | 788 | return; |
690 | } | 789 | } |
691 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n"); | 790 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n"); |
692 | GNUNET_MESH_tunnel_destroy (incoming->tunnel); | 791 | /* set the incoming's tunnel to NULL so that we don't accidentally destroy |
792 | * the tunnel again. */ | ||
793 | tunnel = incoming->tunnel; | ||
794 | incoming->tunnel = NULL; | ||
795 | GNUNET_MESH_tunnel_destroy (tunnel); | ||
693 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 796 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
694 | } | 797 | } |
695 | 798 | ||
696 | 799 | ||
697 | /** | 800 | /** |
698 | * Called when a client wants to add an element to a | 801 | * Called when a client wants to add/remove an element to/from a |
699 | * set it inhabits. | 802 | * set it inhabits. |
700 | * | 803 | * |
701 | * @param cls unused | 804 | * @param cls unused |
@@ -784,10 +887,9 @@ handle_client_evaluate (void *cls, | |||
784 | const struct GNUNET_MessageHeader *m) | 887 | const struct GNUNET_MessageHeader *m) |
785 | { | 888 | { |
786 | struct Set *set; | 889 | struct Set *set; |
787 | struct TunnelContext *tc; | ||
788 | struct GNUNET_MESH_Tunnel *tunnel; | ||
789 | struct GNUNET_SET_EvaluateMessage *msg; | 890 | struct GNUNET_SET_EvaluateMessage *msg; |
790 | struct OperationSpecification *spec; | 891 | struct OperationSpecification *spec; |
892 | struct Operation *op; | ||
791 | 893 | ||
792 | set = set_get (client); | 894 | set = set_get (client); |
793 | if (NULL == set) | 895 | if (NULL == set) |
@@ -798,7 +900,6 @@ handle_client_evaluate (void *cls, | |||
798 | } | 900 | } |
799 | 901 | ||
800 | msg = (struct GNUNET_SET_EvaluateMessage *) m; | 902 | msg = (struct GNUNET_SET_EvaluateMessage *) m; |
801 | tc = GNUNET_new (struct TunnelContext); | ||
802 | spec = GNUNET_new (struct OperationSpecification); | 903 | spec = GNUNET_new (struct OperationSpecification); |
803 | spec->operation = set->operation; | 904 | spec->operation = set->operation; |
804 | spec->app_id = msg->app_id; | 905 | spec->app_id = msg->app_id; |
@@ -811,13 +912,20 @@ handle_client_evaluate (void *cls, | |||
811 | if (NULL != spec->context_msg) | 912 | if (NULL != spec->context_msg) |
812 | spec->context_msg = GNUNET_copy_message (spec->context_msg); | 913 | spec->context_msg = GNUNET_copy_message (spec->context_msg); |
813 | 914 | ||
814 | tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer, | 915 | op = GNUNET_new (struct Operation); |
815 | GNUNET_APPLICATION_TYPE_SET, | 916 | op->spec = spec; |
816 | GNUNET_YES, | 917 | op->generation_created = set->current_generation++; |
817 | GNUNET_YES); | 918 | op->vt = set->vt; |
919 | GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); | ||
818 | 920 | ||
819 | set->vt->evaluate (spec, tunnel, tc); | 921 | op->tunnel = GNUNET_MESH_tunnel_create (mesh, op, &msg->target_peer, |
922 | GNUNET_APPLICATION_TYPE_SET, | ||
923 | GNUNET_YES, | ||
924 | GNUNET_YES); | ||
820 | 925 | ||
926 | op->mq = GNUNET_MESH_mq_create (op->tunnel); | ||
927 | |||
928 | set->vt->evaluate (op); | ||
821 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 929 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
822 | } | 930 | } |
823 | 931 | ||
@@ -857,8 +965,8 @@ handle_client_iter_ack (void *cls, | |||
857 | 965 | ||
858 | 966 | ||
859 | /** | 967 | /** |
860 | * Handle a request from the client to accept | 968 | * Handle a request from the client to |
861 | * a set operation that came from a remote peer. | 969 | * cancel a running set operation. |
862 | * | 970 | * |
863 | * @param cls unused | 971 | * @param cls unused |
864 | * @param client the client | 972 | * @param client the client |
@@ -872,6 +980,8 @@ handle_client_cancel (void *cls, | |||
872 | const struct GNUNET_SET_CancelMessage *msg = | 980 | const struct GNUNET_SET_CancelMessage *msg = |
873 | (const struct GNUNET_SET_CancelMessage *) mh; | 981 | (const struct GNUNET_SET_CancelMessage *) mh; |
874 | struct Set *set; | 982 | struct Set *set; |
983 | struct Operation *op; | ||
984 | int found; | ||
875 | 985 | ||
876 | set = set_get (client); | 986 | set = set_get (client); |
877 | if (NULL == set) | 987 | if (NULL == set) |
@@ -880,8 +990,24 @@ handle_client_cancel (void *cls, | |||
880 | GNUNET_SERVER_client_disconnect (client); | 990 | GNUNET_SERVER_client_disconnect (client); |
881 | return; | 991 | return; |
882 | } | 992 | } |
883 | /* FIXME: maybe cancel should return success/error code? */ | 993 | found = GNUNET_NO; |
884 | set->vt->cancel (set->state, ntohl (msg->request_id)); | 994 | for (op = set->ops_head; NULL != op; op = op->next) |
995 | { | ||
996 | if (op->spec->client_request_id == msg->request_id) | ||
997 | { | ||
998 | found = GNUNET_YES; | ||
999 | break; | ||
1000 | } | ||
1001 | } | ||
1002 | |||
1003 | if (GNUNET_NO == found) | ||
1004 | { | ||
1005 | GNUNET_break (0); | ||
1006 | GNUNET_SERVER_client_disconnect (client); | ||
1007 | return; | ||
1008 | } | ||
1009 | |||
1010 | _GSS_operation_destroy (op); | ||
885 | } | 1011 | } |
886 | 1012 | ||
887 | 1013 | ||
@@ -899,20 +1025,22 @@ handle_client_accept (void *cls, | |||
899 | const struct GNUNET_MessageHeader *mh) | 1025 | const struct GNUNET_MessageHeader *mh) |
900 | { | 1026 | { |
901 | struct Set *set; | 1027 | struct Set *set; |
902 | struct Incoming *incoming; | ||
903 | struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; | 1028 | struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; |
1029 | struct Operation *op; | ||
904 | 1030 | ||
905 | incoming = get_incoming (ntohl (msg->accept_reject_id)); | 1031 | op = get_incoming (ntohl (msg->accept_reject_id)); |
906 | |||
907 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl (msg->accept_reject_id)); | ||
908 | 1032 | ||
909 | if (NULL == incoming) | 1033 | if (NULL == op) |
910 | { | 1034 | { |
911 | GNUNET_break (0); | 1035 | GNUNET_break (0); |
912 | GNUNET_SERVER_client_disconnect (client); | 1036 | GNUNET_SERVER_client_disconnect (client); |
913 | return; | 1037 | return; |
914 | } | 1038 | } |
915 | 1039 | ||
1040 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl (msg->accept_reject_id)); | ||
1041 | |||
1042 | GNUNET_assert (GNUNET_YES == op->is_incoming); | ||
1043 | |||
916 | set = set_get (client); | 1044 | set = set_get (client); |
917 | 1045 | ||
918 | if (NULL == set) | 1046 | if (NULL == set) |
@@ -922,13 +1050,21 @@ handle_client_accept (void *cls, | |||
922 | return; | 1050 | return; |
923 | } | 1051 | } |
924 | 1052 | ||
925 | incoming->spec->set = set; | 1053 | op->spec->set = set; |
926 | incoming->spec->client_request_id = ntohl (msg->request_id); | 1054 | |
927 | incoming->spec->result_mode = ntohs (msg->result_mode); | 1055 | incoming_retire (op); |
928 | set->vt->accept (incoming->spec, incoming->tunnel, incoming->tc); | 1056 | |
929 | /* tunnel ownership goes to operation */ | 1057 | GNUNET_assert (NULL != op->spec->set); |
930 | incoming->tunnel = NULL; | 1058 | GNUNET_assert (NULL != op->spec->set->vt); |
931 | incoming_destroy (incoming); | 1059 | |
1060 | GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); | ||
1061 | |||
1062 | op->spec->client_request_id = ntohl (msg->request_id); | ||
1063 | op->spec->result_mode = ntohs (msg->result_mode); | ||
1064 | op->generation_created = set->current_generation++; | ||
1065 | op->vt = op->spec->set->vt; | ||
1066 | GNUNET_assert (NULL != op->vt->accept); | ||
1067 | set->vt->accept (op); | ||
932 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1068 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
933 | } | 1069 | } |
934 | 1070 | ||
@@ -952,9 +1088,8 @@ shutdown_task (void *cls, | |||
952 | while (NULL != sets_head) | 1088 | while (NULL != sets_head) |
953 | set_destroy (sets_head); | 1089 | set_destroy (sets_head); |
954 | 1090 | ||
955 | 1091 | /* it's important to destroy mesh at the end, as all tunnels | |
956 | /* it's important to destroy mesh at the end, as tunnels | 1092 | * must be destroyed before the mesh handle! */ |
957 | * must be destroyed first! */ | ||
958 | if (NULL != mesh) | 1093 | if (NULL != mesh) |
959 | { | 1094 | { |
960 | GNUNET_MESH_disconnect (mesh); | 1095 | GNUNET_MESH_disconnect (mesh); |
@@ -966,7 +1101,8 @@ shutdown_task (void *cls, | |||
966 | 1101 | ||
967 | 1102 | ||
968 | /** | 1103 | /** |
969 | * Signature of the main function of a task. | 1104 | * Handle an incoming peer timeout, that is, disconnect a peer if |
1105 | * has not requested an operation for some amount of time. | ||
970 | * | 1106 | * |
971 | * @param cls closure | 1107 | * @param cls closure |
972 | * @param tc context information (why was this task triggered now) | 1108 | * @param tc context information (why was this task triggered now) |
@@ -975,7 +1111,9 @@ static void | |||
975 | incoming_timeout_cb (void *cls, | 1111 | incoming_timeout_cb (void *cls, |
976 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 1112 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
977 | { | 1113 | { |
978 | struct Incoming *incoming = cls; | 1114 | struct Operation *incoming = cls; |
1115 | |||
1116 | GNUNET_assert (GNUNET_YES == incoming->is_incoming); | ||
979 | 1117 | ||
980 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | 1118 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) |
981 | return; | 1119 | return; |
@@ -986,13 +1124,12 @@ incoming_timeout_cb (void *cls, | |||
986 | 1124 | ||
987 | 1125 | ||
988 | static void | 1126 | static void |
989 | handle_incoming_disconnect (struct OperationState *op_state) | 1127 | handle_incoming_disconnect (struct Operation *op) |
990 | { | 1128 | { |
991 | struct Incoming *incoming = (struct Incoming *) op_state; | 1129 | if (NULL == op->tunnel) |
992 | if (NULL == incoming->tunnel) | ||
993 | return; | 1130 | return; |
994 | 1131 | ||
995 | incoming_destroy (incoming); | 1132 | incoming_destroy (op); |
996 | } | 1133 | } |
997 | 1134 | ||
998 | 1135 | ||
@@ -1017,7 +1154,7 @@ tunnel_new_cb (void *cls, | |||
1017 | const struct GNUNET_PeerIdentity *initiator, | 1154 | const struct GNUNET_PeerIdentity *initiator, |
1018 | uint32_t port) | 1155 | uint32_t port) |
1019 | { | 1156 | { |
1020 | struct Incoming *incoming; | 1157 | struct Operation *incoming; |
1021 | static const struct SetVT incoming_vt = { | 1158 | static const struct SetVT incoming_vt = { |
1022 | .msg_handler = handle_incoming_msg, | 1159 | .msg_handler = handle_incoming_msg, |
1023 | .peer_disconnect = handle_incoming_disconnect | 1160 | .peer_disconnect = handle_incoming_disconnect |
@@ -1026,17 +1163,18 @@ tunnel_new_cb (void *cls, | |||
1026 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n"); | 1163 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n"); |
1027 | 1164 | ||
1028 | GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); | 1165 | GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); |
1029 | incoming = GNUNET_new (struct Incoming); | 1166 | incoming = GNUNET_new (struct Operation); |
1030 | incoming->peer = *initiator; | 1167 | incoming->is_incoming = GNUNET_YES; |
1168 | incoming->state = GNUNET_new (struct OperationState); | ||
1169 | incoming->state->peer = *initiator; | ||
1031 | incoming->tunnel = tunnel; | 1170 | incoming->tunnel = tunnel; |
1032 | incoming->tc = GNUNET_new (struct TunnelContext);; | 1171 | incoming->mq = GNUNET_MESH_mq_create (incoming->tunnel); |
1033 | incoming->tc->vt = &incoming_vt; | 1172 | incoming->vt = &incoming_vt; |
1034 | incoming->tc->op = (struct OperationState *) incoming; | 1173 | incoming->state->timeout_task = |
1035 | incoming->timeout_task = | ||
1036 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); | 1174 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); |
1037 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); | 1175 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); |
1038 | 1176 | ||
1039 | return incoming->tc; | 1177 | return incoming; |
1040 | } | 1178 | } |
1041 | 1179 | ||
1042 | 1180 | ||
@@ -1055,9 +1193,14 @@ static void | |||
1055 | tunnel_end_cb (void *cls, | 1193 | tunnel_end_cb (void *cls, |
1056 | const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx) | 1194 | const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx) |
1057 | { | 1195 | { |
1058 | struct TunnelContext *ctx = tunnel_ctx; | 1196 | struct Operation *op = tunnel_ctx; |
1197 | |||
1198 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tunnel end cb called\n"); | ||
1199 | |||
1200 | op->tunnel = NULL; | ||
1059 | 1201 | ||
1060 | ctx->vt->peer_disconnect (ctx->op); | 1202 | if (NULL != op->vt) |
1203 | op->vt->peer_disconnect (op); | ||
1061 | /* mesh will never call us with the context again! */ | 1204 | /* mesh will never call us with the context again! */ |
1062 | GNUNET_free (tunnel_ctx); | 1205 | GNUNET_free (tunnel_ctx); |
1063 | } | 1206 | } |
@@ -1085,14 +1228,14 @@ dispatch_p2p_message (void *cls, | |||
1085 | void **tunnel_ctx, | 1228 | void **tunnel_ctx, |
1086 | const struct GNUNET_MessageHeader *message) | 1229 | const struct GNUNET_MessageHeader *message) |
1087 | { | 1230 | { |
1088 | struct TunnelContext *tc = *tunnel_ctx; | 1231 | struct Operation *op = *tunnel_ctx; |
1089 | int ret; | 1232 | int ret; |
1090 | 1233 | ||
1091 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n", | 1234 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n", |
1092 | ntohs (message->type)); | 1235 | ntohs (message->type)); |
1093 | /* do this before the handler, as the handler might kill the tunnel */ | 1236 | /* do this before the handler, as the handler might kill the tunnel */ |
1094 | GNUNET_MESH_receive_done (tunnel); | 1237 | GNUNET_MESH_receive_done (tunnel); |
1095 | ret = tc->vt->msg_handler (tc->op, message); | 1238 | ret = op->vt->msg_handler (op, message); |
1096 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n", | 1239 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n", |
1097 | ntohs (message->type)); | 1240 | ntohs (message->type)); |
1098 | return ret; | 1241 | return ret; |
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 7c2363e9f..7a2c5ba8d 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h | |||
@@ -55,8 +55,8 @@ struct OperationState; | |||
55 | 55 | ||
56 | /* forward declarations */ | 56 | /* forward declarations */ |
57 | struct Set; | 57 | struct Set; |
58 | struct TunnelContext; | ||
59 | struct ElementEntry; | 58 | struct ElementEntry; |
59 | struct Operation; | ||
60 | 60 | ||
61 | 61 | ||
62 | /** | 62 | /** |
@@ -135,7 +135,7 @@ typedef void (*AddRemoveImpl) (struct SetState *state, struct ElementEntry *ee); | |||
135 | * | 135 | * |
136 | * @param op the set operation, contains implementation-specific data | 136 | * @param op the set operation, contains implementation-specific data |
137 | */ | 137 | */ |
138 | typedef void (*PeerDisconnectImpl) (struct OperationState *op); | 138 | typedef void (*PeerDisconnectImpl) (struct Operation *op); |
139 | 139 | ||
140 | 140 | ||
141 | /** | 141 | /** |
@@ -151,13 +151,9 @@ typedef void (*DestroySetImpl) (struct SetState *state); | |||
151 | * Signature of functions that implement the creation of set operations | 151 | * Signature of functions that implement the creation of set operations |
152 | * (currently evaluate and accept). | 152 | * (currently evaluate and accept). |
153 | * | 153 | * |
154 | * @param spec specification of the set operation to be created | 154 | * @param op operation that is created, should be initialized by the implementation |
155 | * @param tunnel the tunnel with the other peer | ||
156 | * @param tc tunnel context | ||
157 | */ | 155 | */ |
158 | typedef void (*OpCreateImpl) (struct OperationSpecification *spec, | 156 | typedef void (*OpCreateImpl) (struct Operation *op); |
159 | struct GNUNET_MESH_Tunnel *tunnel, | ||
160 | struct TunnelContext *tc); | ||
161 | 157 | ||
162 | 158 | ||
163 | /** | 159 | /** |
@@ -169,11 +165,10 @@ typedef void (*OpCreateImpl) (struct OperationSpecification *spec, | |||
169 | * @return GNUNET_OK on success, GNUNET_SYSERR to | 165 | * @return GNUNET_OK on success, GNUNET_SYSERR to |
170 | * destroy the operation and the tunnel | 166 | * destroy the operation and the tunnel |
171 | */ | 167 | */ |
172 | typedef int (*MsgHandlerImpl) (struct OperationState *op, | 168 | typedef int (*MsgHandlerImpl) (struct Operation *op, |
173 | const struct GNUNET_MessageHeader *msg); | 169 | const struct GNUNET_MessageHeader *msg); |
174 | 170 | ||
175 | typedef void (*CancelImpl) (struct SetState *set, | 171 | typedef void (*CancelImpl) (struct Operation *op); |
176 | uint32_t request_id); | ||
177 | 172 | ||
178 | 173 | ||
179 | /** | 174 | /** |
@@ -263,6 +258,7 @@ struct ElementEntry | |||
263 | 258 | ||
264 | /** | 259 | /** |
265 | * Hash of the element. | 260 | * Hash of the element. |
261 | * For set union: | ||
266 | * Will be used to derive the different IBF keys | 262 | * Will be used to derive the different IBF keys |
267 | * for different salts. | 263 | * for different salts. |
268 | */ | 264 | */ |
@@ -294,6 +290,63 @@ struct ElementEntry | |||
294 | }; | 290 | }; |
295 | 291 | ||
296 | 292 | ||
293 | struct Operation | ||
294 | { | ||
295 | /** | ||
296 | * V-Table for the operation belonging | ||
297 | * to the tunnel contest. | ||
298 | */ | ||
299 | const struct SetVT *vt; | ||
300 | |||
301 | /** | ||
302 | * Tunnel to the peer. | ||
303 | */ | ||
304 | struct GNUNET_MESH_Tunnel *tunnel; | ||
305 | |||
306 | /** | ||
307 | * Message queue for the tunnel. | ||
308 | */ | ||
309 | struct GNUNET_MQ_Handle *mq; | ||
310 | |||
311 | /** | ||
312 | * GNUNET_YES if this is not a "real" set operation yet, and we still | ||
313 | * need to wait for the other peer to give us more details. | ||
314 | */ | ||
315 | int is_incoming; | ||
316 | |||
317 | /** | ||
318 | * Generation in which the operation handle | ||
319 | * was created. | ||
320 | */ | ||
321 | unsigned int generation_created; | ||
322 | |||
323 | /** | ||
324 | * Detail information about the set operation, | ||
325 | * including the set to use. | ||
326 | * When 'spec' is NULL, the operation is not yet entirely | ||
327 | * initialized. | ||
328 | */ | ||
329 | struct OperationSpecification *spec; | ||
330 | |||
331 | /** | ||
332 | * Operation-specific operation state. | ||
333 | */ | ||
334 | struct OperationState *state; | ||
335 | |||
336 | /** | ||
337 | * Evaluate operations are held in | ||
338 | * a linked list. | ||
339 | */ | ||
340 | struct Operation *next; | ||
341 | |||
342 | /** | ||
343 | * Evaluate operations are held in | ||
344 | * a linked list. | ||
345 | */ | ||
346 | struct Operation *prev; | ||
347 | }; | ||
348 | |||
349 | |||
297 | /** | 350 | /** |
298 | * A set that supports a specific operation | 351 | * A set that supports a specific operation |
299 | * with other peers. | 352 | * with other peers. |
@@ -353,28 +406,25 @@ struct Set | |||
353 | * previously executed operations on this set | 406 | * previously executed operations on this set |
354 | */ | 407 | */ |
355 | unsigned int current_generation; | 408 | unsigned int current_generation; |
356 | }; | ||
357 | 409 | ||
358 | |||
359 | /** | ||
360 | * Information about a tunnel we are connected to. | ||
361 | * Used as tunnel context with mesh. | ||
362 | */ | ||
363 | struct TunnelContext | ||
364 | { | ||
365 | /** | 410 | /** |
366 | * V-Table for the operation belonging | 411 | * Evaluate operations are held in |
367 | * to the tunnel contest. | 412 | * a linked list. |
368 | */ | 413 | */ |
369 | const struct SetVT *vt; | 414 | struct Operation *ops_head; |
370 | 415 | ||
371 | /** | 416 | /** |
372 | * Implementation-specific operation state. | 417 | * Evaluate operations are held in |
418 | * a linked list. | ||
373 | */ | 419 | */ |
374 | struct OperationState *op; | 420 | struct Operation *ops_tail; |
375 | }; | 421 | }; |
376 | 422 | ||
377 | 423 | ||
424 | void | ||
425 | _GSS_operation_destroy (struct Operation *op); | ||
426 | |||
427 | |||
378 | /** | 428 | /** |
379 | * Get the table with implementing functions for | 429 | * Get the table with implementing functions for |
380 | * set union. | 430 | * set union. |
@@ -382,6 +432,7 @@ struct TunnelContext | |||
382 | const struct SetVT * | 432 | const struct SetVT * |
383 | _GSS_union_vt (void); | 433 | _GSS_union_vt (void); |
384 | 434 | ||
435 | |||
385 | /** | 436 | /** |
386 | * Get the table with implementing functions for | 437 | * Get the table with implementing functions for |
387 | * set intersection. | 438 | * set intersection. |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 33a36d260..6ad985bcb 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -112,12 +112,6 @@ struct OperationState | |||
112 | struct GNUNET_MESH_Tunnel *tunnel; | 112 | struct GNUNET_MESH_Tunnel *tunnel; |
113 | 113 | ||
114 | /** | 114 | /** |
115 | * Detail information about the set operation, | ||
116 | * including the set to use. | ||
117 | */ | ||
118 | struct OperationSpecification *spec; | ||
119 | |||
120 | /** | ||
121 | * Message queue for the peer. | 115 | * Message queue for the peer. |
122 | */ | 116 | */ |
123 | struct GNUNET_MQ_Handle *mq; | 117 | struct GNUNET_MQ_Handle *mq; |
@@ -151,33 +145,14 @@ struct OperationState | |||
151 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; | 145 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; |
152 | 146 | ||
153 | /** | 147 | /** |
154 | * Current state of the operation. | 148 | * Iterator for sending elements on the key to element mapping to the client. |
155 | */ | ||
156 | enum UnionOperationPhase phase; | ||
157 | |||
158 | /** | ||
159 | * Generation in which the operation handle | ||
160 | * was created. | ||
161 | */ | ||
162 | unsigned int generation_created; | ||
163 | |||
164 | /** | ||
165 | * Set state of the set that this operation | ||
166 | * belongs to. | ||
167 | */ | 149 | */ |
168 | struct Set *set; | 150 | struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter; |
169 | 151 | ||
170 | /** | 152 | /** |
171 | * Evaluate operations are held in | 153 | * Current state of the operation. |
172 | * a linked list. | ||
173 | */ | 154 | */ |
174 | struct OperationState *next; | 155 | enum UnionOperationPhase phase; |
175 | |||
176 | /** | ||
177 | * Evaluate operations are held in | ||
178 | * a linked list. | ||
179 | */ | ||
180 | struct OperationState *prev; | ||
181 | 156 | ||
182 | /** | 157 | /** |
183 | * Did we send the client that we are done? | 158 | * Did we send the client that we are done? |
@@ -198,13 +173,13 @@ struct KeyEntry | |||
198 | struct IBF_Key ibf_key; | 173 | struct IBF_Key ibf_key; |
199 | 174 | ||
200 | /** | 175 | /** |
201 | * The actual element associated with the key | 176 | * The actual element associated with the key. |
202 | */ | 177 | */ |
203 | struct ElementEntry *element; | 178 | struct ElementEntry *element; |
204 | 179 | ||
205 | /** | 180 | /** |
206 | * Element that collides with this element | 181 | * Element that collides with this element |
207 | * on the ibf key | 182 | * on the ibf key. All colliding entries must have the same ibf key. |
208 | */ | 183 | */ |
209 | struct KeyEntry *next_colliding; | 184 | struct KeyEntry *next_colliding; |
210 | }; | 185 | }; |
@@ -226,7 +201,7 @@ struct SendElementClosure | |||
226 | * Operation for which the elements | 201 | * Operation for which the elements |
227 | * should be sent. | 202 | * should be sent. |
228 | */ | 203 | */ |
229 | struct OperationState *eo; | 204 | struct Operation *op; |
230 | }; | 205 | }; |
231 | 206 | ||
232 | 207 | ||
@@ -242,18 +217,6 @@ struct SetState | |||
242 | * salt=0. | 217 | * salt=0. |
243 | */ | 218 | */ |
244 | struct StrataEstimator *se; | 219 | struct StrataEstimator *se; |
245 | |||
246 | /** | ||
247 | * Evaluate operations are held in | ||
248 | * a linked list. | ||
249 | */ | ||
250 | struct OperationState *ops_head; | ||
251 | |||
252 | /** | ||
253 | * Evaluate operations are held in | ||
254 | * a linked list. | ||
255 | */ | ||
256 | struct OperationState *ops_tail; | ||
257 | }; | 220 | }; |
258 | 221 | ||
259 | 222 | ||
@@ -263,9 +226,9 @@ struct SetState | |||
263 | * @param cls closure | 226 | * @param cls closure |
264 | * @param key current key code | 227 | * @param key current key code |
265 | * @param value value in the hash map | 228 | * @param value value in the hash map |
266 | * @return GNUNET_YES if we should continue to | 229 | * @return #GNUNET_YES if we should continue to |
267 | * iterate, | 230 | * iterate, |
268 | * GNUNET_NO if not. | 231 | * #GNUNET_NO if not. |
269 | */ | 232 | */ |
270 | static int | 233 | static int |
271 | destroy_key_to_element_iter (void *cls, | 234 | destroy_key_to_element_iter (void *cls, |
@@ -290,65 +253,40 @@ destroy_key_to_element_iter (void *cls, | |||
290 | 253 | ||
291 | 254 | ||
292 | /** | 255 | /** |
293 | * Destroy a union operation, and free all resources | 256 | * Destroy the union operation. Only things specific to the union operation are destroyed. |
294 | * associated with it. | 257 | * |
295 | * | 258 | * @param op union operation to destroy |
296 | * @param eo the union operation to destroy | ||
297 | */ | 259 | */ |
298 | static void | 260 | static void |
299 | union_operation_destroy (struct OperationState *eo) | 261 | union_op_cancel (struct Operation *op) |
300 | { | 262 | { |
301 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); | 263 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); |
302 | GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head, | 264 | /* check if the op was canceled twice */ |
303 | eo->set->state->ops_tail, | 265 | GNUNET_assert (NULL != op->state); |
304 | eo); | 266 | if (NULL != op->state->remote_ibf) |
305 | if (NULL != eo->mq) | ||
306 | { | ||
307 | GNUNET_MQ_destroy (eo->mq); | ||
308 | eo->mq = NULL; | ||
309 | } | ||
310 | if (NULL != eo->tunnel) | ||
311 | { | ||
312 | struct GNUNET_MESH_Tunnel *t = eo->tunnel; | ||
313 | eo->tunnel = NULL; | ||
314 | GNUNET_MESH_tunnel_destroy (t); | ||
315 | } | ||
316 | if (NULL != eo->remote_ibf) | ||
317 | { | 267 | { |
318 | ibf_destroy (eo->remote_ibf); | 268 | ibf_destroy (op->state->remote_ibf); |
319 | eo->remote_ibf = NULL; | 269 | op->state->remote_ibf = NULL; |
320 | } | 270 | } |
321 | if (NULL != eo->local_ibf) | 271 | if (NULL != op->state->local_ibf) |
322 | { | 272 | { |
323 | ibf_destroy (eo->local_ibf); | 273 | ibf_destroy (op->state->local_ibf); |
324 | eo->local_ibf = NULL; | 274 | op->state->local_ibf = NULL; |
325 | } | 275 | } |
326 | if (NULL != eo->se) | 276 | if (NULL != op->state->se) |
327 | { | 277 | { |
328 | strata_estimator_destroy (eo->se); | 278 | strata_estimator_destroy (op->state->se); |
329 | eo->se = NULL; | 279 | op->state->se = NULL; |
330 | } | 280 | } |
331 | if (NULL != eo->key_to_element) | 281 | if (NULL != op->state->key_to_element) |
332 | { | 282 | { |
333 | GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL); | 283 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL); |
334 | GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); | 284 | GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); |
335 | eo->key_to_element = NULL; | 285 | op->state->key_to_element = NULL; |
336 | } | 286 | } |
337 | if (NULL != eo->spec) | 287 | GNUNET_free (op->state); |
338 | { | 288 | op->state = NULL; |
339 | if (NULL != eo->spec->context_msg) | ||
340 | { | ||
341 | GNUNET_free (eo->spec->context_msg); | ||
342 | eo->spec->context_msg = NULL; | ||
343 | } | ||
344 | GNUNET_free (eo->spec); | ||
345 | eo->spec = NULL; | ||
346 | } | ||
347 | GNUNET_free (eo); | ||
348 | |||
349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); | 289 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); |
350 | |||
351 | /* FIXME: do a garbage collection of the set generations */ | ||
352 | } | 290 | } |
353 | 291 | ||
354 | 292 | ||
@@ -356,20 +294,22 @@ union_operation_destroy (struct OperationState *eo) | |||
356 | * Inform the client that the union operation has failed, | 294 | * Inform the client that the union operation has failed, |
357 | * and proceed to destroy the evaluate operation. | 295 | * and proceed to destroy the evaluate operation. |
358 | * | 296 | * |
359 | * @param eo the union operation to fail | 297 | * @param op the union operation to fail |
360 | */ | 298 | */ |
361 | static void | 299 | static void |
362 | fail_union_operation (struct OperationState *eo) | 300 | fail_union_operation (struct Operation *op) |
363 | { | 301 | { |
364 | struct GNUNET_MQ_Envelope *ev; | 302 | struct GNUNET_MQ_Envelope *ev; |
365 | struct GNUNET_SET_ResultMessage *msg; | 303 | struct GNUNET_SET_ResultMessage *msg; |
366 | 304 | ||
305 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n"); | ||
306 | |||
367 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 307 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
368 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 308 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
369 | msg->request_id = htonl (eo->spec->client_request_id); | 309 | msg->request_id = htonl (op->spec->client_request_id); |
370 | msg->element_type = htons (0); | 310 | msg->element_type = htons (0); |
371 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 311 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
372 | union_operation_destroy (eo); | 312 | _GSS_operation_destroy (op); |
373 | } | 313 | } |
374 | 314 | ||
375 | 315 | ||
@@ -382,7 +322,7 @@ fail_union_operation (struct OperationState *eo) | |||
382 | * @return the derived IBF key | 322 | * @return the derived IBF key |
383 | */ | 323 | */ |
384 | static struct IBF_Key | 324 | static struct IBF_Key |
385 | get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | 325 | get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt) |
386 | { | 326 | { |
387 | struct IBF_Key key; | 327 | struct IBF_Key key; |
388 | 328 | ||
@@ -398,40 +338,39 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | |||
398 | /** | 338 | /** |
399 | * Send a request for the evaluate operation to a remote peer | 339 | * Send a request for the evaluate operation to a remote peer |
400 | * | 340 | * |
401 | * @param eo operation with the other peer | 341 | * @param op operation with the other peer |
402 | */ | 342 | */ |
403 | static void | 343 | static void |
404 | send_operation_request (struct OperationState *eo) | 344 | send_operation_request (struct Operation *op) |
405 | { | 345 | { |
406 | struct GNUNET_MQ_Envelope *ev; | 346 | struct GNUNET_MQ_Envelope *ev; |
407 | struct OperationRequestMessage *msg; | 347 | struct OperationRequestMessage *msg; |
408 | 348 | ||
409 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 349 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
410 | eo->spec->context_msg); | 350 | op->spec->context_msg); |
411 | 351 | ||
412 | if (NULL == ev) | 352 | if (NULL == ev) |
413 | { | 353 | { |
414 | /* the context message is too large */ | 354 | /* the context message is too large */ |
415 | GNUNET_break (0); | 355 | GNUNET_break (0); |
416 | GNUNET_SERVER_client_disconnect (eo->spec->set->client); | 356 | GNUNET_SERVER_client_disconnect (op->spec->set->client); |
417 | return; | 357 | return; |
418 | } | 358 | } |
419 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); | 359 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); |
420 | msg->app_id = eo->spec->app_id; | 360 | msg->app_id = op->spec->app_id; |
421 | msg->salt = htonl (eo->spec->salt); | 361 | msg->salt = htonl (op->spec->salt); |
422 | GNUNET_MQ_send (eo->mq, ev); | 362 | GNUNET_MQ_send (op->mq, ev); |
423 | 363 | ||
424 | if (NULL != eo->spec->context_msg) | 364 | if (NULL != op->spec->context_msg) |
425 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); | 365 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); |
426 | else | 366 | else |
427 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); | 367 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); |
428 | 368 | ||
429 | if (NULL != eo->spec->context_msg) | 369 | if (NULL != op->spec->context_msg) |
430 | { | 370 | { |
431 | GNUNET_free (eo->spec->context_msg); | 371 | GNUNET_free (op->spec->context_msg); |
432 | eo->spec->context_msg = NULL; | 372 | op->spec->context_msg = NULL; |
433 | } | 373 | } |
434 | |||
435 | } | 374 | } |
436 | 375 | ||
437 | 376 | ||
@@ -442,34 +381,89 @@ send_operation_request (struct OperationState *eo) | |||
442 | * @param cls closure | 381 | * @param cls closure |
443 | * @param key current key code | 382 | * @param key current key code |
444 | * @param value value in the hash map | 383 | * @param value value in the hash map |
445 | * @return GNUNET_YES if we should continue to | 384 | * @return #GNUNET_YES if we should continue to |
446 | * iterate, | 385 | * iterate, |
447 | * GNUNET_NO if not. | 386 | * #GNUNET_NO if not. |
448 | */ | 387 | */ |
449 | static int | 388 | static int |
450 | op_register_element_iterator (void *cls, | 389 | op_register_element_iterator (void *cls, |
451 | uint32_t key, | 390 | uint32_t key, |
452 | void *value) | 391 | void *value) |
453 | { | 392 | { |
454 | struct KeyEntry *const new_k = cls; | 393 | struct KeyEntry *const new_k = cls; |
455 | struct KeyEntry *old_k = value; | 394 | struct KeyEntry *old_k = value; |
456 | 395 | ||
457 | GNUNET_assert (NULL != old_k); | 396 | GNUNET_assert (NULL != old_k); |
458 | do | 397 | /* check if our ibf key collides with the ibf key in the existing entry */ |
398 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) | ||
459 | { | 399 | { |
460 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) | 400 | /* insert the the new key in the collision chain */ |
461 | { | 401 | new_k->next_colliding = old_k->next_colliding; |
462 | new_k->next_colliding = old_k->next_colliding; | 402 | old_k->next_colliding = new_k; |
463 | old_k->next_colliding = new_k; | 403 | /* signal to the caller that we were able to insert into a colliding bucket */ |
404 | return GNUNET_NO; | ||
405 | } | ||
406 | return GNUNET_YES; | ||
407 | } | ||
408 | |||
409 | |||
410 | /** | ||
411 | * Iterator to create the mapping between ibf keys | ||
412 | * and element entries. | ||
413 | * | ||
414 | * @param cls closure | ||
415 | * @param key current key code | ||
416 | * @param value value in the hash map | ||
417 | * @return #GNUNET_YES if we should continue to | ||
418 | * iterate, | ||
419 | * #GNUNET_NO if not. | ||
420 | */ | ||
421 | static int | ||
422 | op_has_element_iterator (void *cls, | ||
423 | uint32_t key, | ||
424 | void *value) | ||
425 | { | ||
426 | struct GNUNET_HashCode *element_hash = cls; | ||
427 | struct KeyEntry *k = value; | ||
428 | |||
429 | GNUNET_assert (NULL != k); | ||
430 | while (NULL != k) | ||
431 | { | ||
432 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash)) | ||
464 | return GNUNET_NO; | 433 | return GNUNET_NO; |
465 | } | 434 | k = k->next_colliding; |
466 | old_k = old_k->next_colliding; | 435 | } |
467 | } while (NULL != old_k); | ||
468 | return GNUNET_YES; | 436 | return GNUNET_YES; |
469 | } | 437 | } |
470 | 438 | ||
471 | 439 | ||
472 | /** | 440 | /** |
441 | * Determine whether the given element is already in the operation's element | ||
442 | * set. | ||
443 | * | ||
444 | * @param op operation that should be tested for 'element_hash' | ||
445 | * @param element_hash hash of the element to look for | ||
446 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise | ||
447 | */ | ||
448 | static int | ||
449 | op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash) | ||
450 | { | ||
451 | int ret; | ||
452 | struct IBF_Key ibf_key; | ||
453 | |||
454 | ibf_key = get_ibf_key (element_hash, op->spec->salt); | ||
455 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | ||
456 | (uint32_t) ibf_key.key_val, | ||
457 | op_has_element_iterator, (void *) element_hash); | ||
458 | |||
459 | /* was the iteration aborted because we found the element? */ | ||
460 | if (GNUNET_SYSERR == ret) | ||
461 | return GNUNET_YES; | ||
462 | return GNUNET_NO; | ||
463 | } | ||
464 | |||
465 | |||
466 | /** | ||
473 | * Insert an element into the union operation's | 467 | * Insert an element into the union operation's |
474 | * key-to-element mapping. Takes ownership of 'ee'. | 468 | * key-to-element mapping. Takes ownership of 'ee'. |
475 | * Note that this does not insert the element in the set, | 469 | * Note that this does not insert the element in the set, |
@@ -477,21 +471,21 @@ op_register_element_iterator (void *cls, | |||
477 | * This is done to speed up re-tried operations, if some elements | 471 | * This is done to speed up re-tried operations, if some elements |
478 | * were transmitted, and then the IBF fails to decode. | 472 | * were transmitted, and then the IBF fails to decode. |
479 | * | 473 | * |
480 | * @param eo the union operation | 474 | * @param op the union operation |
481 | * @param ee the element entry | 475 | * @param ee the element entry |
482 | */ | 476 | */ |
483 | static void | 477 | static void |
484 | op_register_element (struct OperationState *eo, struct ElementEntry *ee) | 478 | op_register_element (struct Operation *op, struct ElementEntry *ee) |
485 | { | 479 | { |
486 | int ret; | 480 | int ret; |
487 | struct IBF_Key ibf_key; | 481 | struct IBF_Key ibf_key; |
488 | struct KeyEntry *k; | 482 | struct KeyEntry *k; |
489 | 483 | ||
490 | ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); | 484 | ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt); |
491 | k = GNUNET_new (struct KeyEntry); | 485 | k = GNUNET_new (struct KeyEntry); |
492 | k->element = ee; | 486 | k->element = ee; |
493 | k->ibf_key = ibf_key; | 487 | k->ibf_key = ibf_key; |
494 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, | 488 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, |
495 | (uint32_t) ibf_key.key_val, | 489 | (uint32_t) ibf_key.key_val, |
496 | op_register_element_iterator, k); | 490 | op_register_element_iterator, k); |
497 | 491 | ||
@@ -499,7 +493,7 @@ op_register_element (struct OperationState *eo, struct ElementEntry *ee) | |||
499 | if (GNUNET_SYSERR == ret) | 493 | if (GNUNET_SYSERR == ret) |
500 | return; | 494 | return; |
501 | 495 | ||
502 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, | 496 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k, |
503 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 497 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
504 | } | 498 | } |
505 | 499 | ||
@@ -542,19 +536,19 @@ init_key_to_element_iterator (void *cls, | |||
542 | const struct GNUNET_HashCode *key, | 536 | const struct GNUNET_HashCode *key, |
543 | void *value) | 537 | void *value) |
544 | { | 538 | { |
545 | struct OperationState *eo = cls; | 539 | struct Operation *op = cls; |
546 | struct ElementEntry *e = value; | 540 | struct ElementEntry *e = value; |
547 | 541 | ||
548 | /* make sure that the element belongs to the set at the time | 542 | /* make sure that the element belongs to the set at the time |
549 | * of creating the operation */ | 543 | * of creating the operation */ |
550 | if ( (e->generation_added > eo->generation_created) || | 544 | if ( (e->generation_added > op->generation_created) || |
551 | ( (GNUNET_YES == e->removed) && | 545 | ( (GNUNET_YES == e->removed) && |
552 | (e->generation_removed < eo->generation_created))) | 546 | (e->generation_removed < op->generation_created))) |
553 | return GNUNET_YES; | 547 | return GNUNET_YES; |
554 | 548 | ||
555 | GNUNET_assert (GNUNET_NO == e->remote); | 549 | GNUNET_assert (GNUNET_NO == e->remote); |
556 | 550 | ||
557 | op_register_element (eo, e); | 551 | op_register_element (op, e); |
558 | return GNUNET_YES; | 552 | return GNUNET_YES; |
559 | } | 553 | } |
560 | 554 | ||
@@ -563,45 +557,45 @@ init_key_to_element_iterator (void *cls, | |||
563 | * Create an ibf with the operation's elements | 557 | * Create an ibf with the operation's elements |
564 | * of the specified size | 558 | * of the specified size |
565 | * | 559 | * |
566 | * @param eo the union operation | 560 | * @param op the union operation |
567 | * @param size size of the ibf to create | 561 | * @param size size of the ibf to create |
568 | */ | 562 | */ |
569 | static void | 563 | static void |
570 | prepare_ibf (struct OperationState *eo, uint16_t size) | 564 | prepare_ibf (struct Operation *op, uint16_t size) |
571 | { | 565 | { |
572 | if (NULL == eo->key_to_element) | 566 | if (NULL == op->state->key_to_element) |
573 | { | 567 | { |
574 | unsigned int len; | 568 | unsigned int len; |
575 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements); | 569 | len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements); |
576 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | 570 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); |
577 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements, | 571 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, |
578 | init_key_to_element_iterator, eo); | 572 | init_key_to_element_iterator, op); |
579 | } | 573 | } |
580 | if (NULL != eo->local_ibf) | 574 | if (NULL != op->state->local_ibf) |
581 | ibf_destroy (eo->local_ibf); | 575 | ibf_destroy (op->state->local_ibf); |
582 | eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 576 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
583 | GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, | 577 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, |
584 | prepare_ibf_iterator, eo->local_ibf); | 578 | prepare_ibf_iterator, op->state->local_ibf); |
585 | } | 579 | } |
586 | 580 | ||
587 | 581 | ||
588 | /** | 582 | /** |
589 | * Send an ibf of appropriate size. | 583 | * Send an ibf of appropriate size. |
590 | * | 584 | * |
591 | * @param eo the union operation | 585 | * @param op the union operation |
592 | * @param ibf_order order of the ibf to send, size=2^order | 586 | * @param ibf_order order of the ibf to send, size=2^order |
593 | */ | 587 | */ |
594 | static void | 588 | static void |
595 | send_ibf (struct OperationState *eo, uint16_t ibf_order) | 589 | send_ibf (struct Operation *op, uint16_t ibf_order) |
596 | { | 590 | { |
597 | unsigned int buckets_sent = 0; | 591 | unsigned int buckets_sent = 0; |
598 | struct InvertibleBloomFilter *ibf; | 592 | struct InvertibleBloomFilter *ibf; |
599 | 593 | ||
600 | prepare_ibf (eo, 1<<ibf_order); | 594 | prepare_ibf (op, 1<<ibf_order); |
601 | 595 | ||
602 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); | 596 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); |
603 | 597 | ||
604 | ibf = eo->local_ibf; | 598 | ibf = op->state->local_ibf; |
605 | 599 | ||
606 | while (buckets_sent < (1 << ibf_order)) | 600 | while (buckets_sent < (1 << ibf_order)) |
607 | { | 601 | { |
@@ -624,20 +618,20 @@ send_ibf (struct OperationState *eo, uint16_t ibf_order) | |||
624 | buckets_sent += buckets_in_message; | 618 | buckets_sent += buckets_in_message; |
625 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", | 619 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", |
626 | buckets_in_message, buckets_sent, 1<<ibf_order); | 620 | buckets_in_message, buckets_sent, 1<<ibf_order); |
627 | GNUNET_MQ_send (eo->mq, ev); | 621 | GNUNET_MQ_send (op->mq, ev); |
628 | } | 622 | } |
629 | 623 | ||
630 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; | 624 | op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; |
631 | } | 625 | } |
632 | 626 | ||
633 | 627 | ||
634 | /** | 628 | /** |
635 | * Send a strata estimator to the remote peer. | 629 | * Send a strata estimator to the remote peer. |
636 | * | 630 | * |
637 | * @param eo the union operation with the remote peer | 631 | * @param op the union operation with the remote peer |
638 | */ | 632 | */ |
639 | static void | 633 | static void |
640 | send_strata_estimator (struct OperationState *eo) | 634 | send_strata_estimator (struct Operation *op) |
641 | { | 635 | { |
642 | struct GNUNET_MQ_Envelope *ev; | 636 | struct GNUNET_MQ_Envelope *ev; |
643 | struct GNUNET_MessageHeader *strata_msg; | 637 | struct GNUNET_MessageHeader *strata_msg; |
@@ -645,9 +639,9 @@ send_strata_estimator (struct OperationState *eo) | |||
645 | ev = GNUNET_MQ_msg_header_extra (strata_msg, | 639 | ev = GNUNET_MQ_msg_header_extra (strata_msg, |
646 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | 640 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, |
647 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | 641 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); |
648 | strata_estimator_write (eo->set->state->se, &strata_msg[1]); | 642 | strata_estimator_write (op->state->se, &strata_msg[1]); |
649 | GNUNET_MQ_send (eo->mq, ev); | 643 | GNUNET_MQ_send (op->mq, ev); |
650 | eo->phase = PHASE_EXPECT_IBF; | 644 | op->state->phase = PHASE_EXPECT_IBF; |
651 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); | 645 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); |
652 | } | 646 | } |
653 | 647 | ||
@@ -682,27 +676,27 @@ get_order_from_difference (unsigned int diff) | |||
682 | static void | 676 | static void |
683 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | 677 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) |
684 | { | 678 | { |
685 | struct OperationState *eo = cls; | 679 | struct Operation *op = cls; |
686 | struct StrataEstimator *remote_se; | 680 | struct StrataEstimator *remote_se; |
687 | int diff; | 681 | int diff; |
688 | 682 | ||
689 | if (eo->phase != PHASE_EXPECT_SE) | 683 | if (op->state->phase != PHASE_EXPECT_SE) |
690 | { | 684 | { |
691 | fail_union_operation (eo); | 685 | fail_union_operation (op); |
692 | GNUNET_break (0); | 686 | GNUNET_break (0); |
693 | return; | 687 | return; |
694 | } | 688 | } |
695 | remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, | 689 | remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, |
696 | SE_IBF_HASH_NUM); | 690 | SE_IBF_HASH_NUM); |
697 | strata_estimator_read (&mh[1], remote_se); | 691 | strata_estimator_read (&mh[1], remote_se); |
698 | GNUNET_assert (NULL != eo->se); | 692 | GNUNET_assert (NULL != op->state->se); |
699 | diff = strata_estimator_difference (remote_se, eo->se); | 693 | diff = strata_estimator_difference (remote_se, op->state->se); |
700 | strata_estimator_destroy (remote_se); | 694 | strata_estimator_destroy (remote_se); |
701 | strata_estimator_destroy (eo->se); | 695 | strata_estimator_destroy (op->state->se); |
702 | eo->se = NULL; | 696 | op->state->se = NULL; |
703 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", | 697 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", |
704 | diff, 1<<get_order_from_difference (diff)); | 698 | diff, 1<<get_order_from_difference (diff)); |
705 | send_ibf (eo, get_order_from_difference (diff)); | 699 | send_ibf (op, get_order_from_difference (diff)); |
706 | } | 700 | } |
707 | 701 | ||
708 | 702 | ||
@@ -721,7 +715,7 @@ send_element_iterator (void *cls, | |||
721 | { | 715 | { |
722 | struct SendElementClosure *sec = cls; | 716 | struct SendElementClosure *sec = cls; |
723 | struct IBF_Key ibf_key = sec->ibf_key; | 717 | struct IBF_Key ibf_key = sec->ibf_key; |
724 | struct OperationState *eo = sec->eo; | 718 | struct Operation *op = sec->op; |
725 | struct KeyEntry *ke = value; | 719 | struct KeyEntry *ke = value; |
726 | 720 | ||
727 | if (ke->ibf_key.key_val != ibf_key.key_val) | 721 | if (ke->ibf_key.key_val != ibf_key.key_val) |
@@ -743,7 +737,7 @@ send_element_iterator (void *cls, | |||
743 | memcpy (&mh[1], element->data, element->size); | 737 | memcpy (&mh[1], element->data, element->size); |
744 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", | 738 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", |
745 | GNUNET_h2s (&ke->element->element_hash)); | 739 | GNUNET_h2s (&ke->element->element_hash)); |
746 | GNUNET_MQ_send (eo->mq, ev); | 740 | GNUNET_MQ_send (op->mq, ev); |
747 | ke = ke->next_colliding; | 741 | ke = ke->next_colliding; |
748 | } | 742 | } |
749 | return GNUNET_NO; | 743 | return GNUNET_NO; |
@@ -753,17 +747,17 @@ send_element_iterator (void *cls, | |||
753 | * Send all elements that have the specified IBF key | 747 | * Send all elements that have the specified IBF key |
754 | * to the remote peer of the union operation | 748 | * to the remote peer of the union operation |
755 | * | 749 | * |
756 | * @param eo union operation | 750 | * @param op union operation |
757 | * @param ibf_key IBF key of interest | 751 | * @param ibf_key IBF key of interest |
758 | */ | 752 | */ |
759 | static void | 753 | static void |
760 | send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) | 754 | send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key) |
761 | { | 755 | { |
762 | struct SendElementClosure send_cls; | 756 | struct SendElementClosure send_cls; |
763 | 757 | ||
764 | send_cls.ibf_key = ibf_key; | 758 | send_cls.ibf_key = ibf_key; |
765 | send_cls.eo = eo; | 759 | send_cls.op = op; |
766 | GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, | 760 | GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, |
767 | &send_element_iterator, &send_cls); | 761 | &send_element_iterator, &send_cls); |
768 | } | 762 | } |
769 | 763 | ||
@@ -772,10 +766,10 @@ send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) | |||
772 | * Decode which elements are missing on each side, and | 766 | * Decode which elements are missing on each side, and |
773 | * send the appropriate elemens and requests | 767 | * send the appropriate elemens and requests |
774 | * | 768 | * |
775 | * @param eo union operation | 769 | * @param op union operation |
776 | */ | 770 | */ |
777 | static void | 771 | static void |
778 | decode_and_send (struct OperationState *eo) | 772 | decode_and_send (struct Operation *op) |
779 | { | 773 | { |
780 | struct IBF_Key key; | 774 | struct IBF_Key key; |
781 | struct IBF_Key last_key; | 775 | struct IBF_Key last_key; |
@@ -783,14 +777,14 @@ decode_and_send (struct OperationState *eo) | |||
783 | unsigned int num_decoded; | 777 | unsigned int num_decoded; |
784 | struct InvertibleBloomFilter *diff_ibf; | 778 | struct InvertibleBloomFilter *diff_ibf; |
785 | 779 | ||
786 | GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); | 780 | GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase); |
787 | 781 | ||
788 | prepare_ibf (eo, eo->remote_ibf->size); | 782 | prepare_ibf (op, op->state->remote_ibf->size); |
789 | diff_ibf = ibf_dup (eo->local_ibf); | 783 | diff_ibf = ibf_dup (op->state->local_ibf); |
790 | ibf_subtract (diff_ibf, eo->remote_ibf); | 784 | ibf_subtract (diff_ibf, op->state->remote_ibf); |
791 | 785 | ||
792 | ibf_destroy (eo->remote_ibf); | 786 | ibf_destroy (op->state->remote_ibf); |
793 | eo->remote_ibf = NULL; | 787 | op->state->remote_ibf = NULL; |
794 | 788 | ||
795 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); | 789 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); |
796 | 790 | ||
@@ -829,7 +823,7 @@ decode_and_send (struct OperationState *eo) | |||
829 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 823 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
830 | "decoding failed, sending larger ibf (size %u)\n", | 824 | "decoding failed, sending larger ibf (size %u)\n", |
831 | 1<<next_order); | 825 | 1<<next_order); |
832 | send_ibf (eo, next_order); | 826 | send_ibf (op, next_order); |
833 | } | 827 | } |
834 | else | 828 | else |
835 | { | 829 | { |
@@ -844,28 +838,26 @@ decode_and_send (struct OperationState *eo) | |||
844 | 838 | ||
845 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); | 839 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); |
846 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 840 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
847 | GNUNET_MQ_send (eo->mq, ev); | 841 | GNUNET_MQ_send (op->mq, ev); |
848 | break; | 842 | break; |
849 | } | 843 | } |
850 | if (1 == side) | 844 | if (1 == side) |
851 | { | 845 | { |
852 | send_elements_for_key (eo, key); | 846 | send_elements_for_key (op, key); |
853 | } | 847 | } |
854 | else if (-1 == side) | 848 | else if (-1 == side) |
855 | { | 849 | { |
856 | struct GNUNET_MQ_Envelope *ev; | 850 | struct GNUNET_MQ_Envelope *ev; |
857 | struct GNUNET_MessageHeader *msg; | 851 | struct GNUNET_MessageHeader *msg; |
858 | 852 | ||
859 | /* FIXME: before sending the request, check if we may just have the element */ | 853 | /* It may be nice to merge multiple requests, but with mesh's corking it is not worth |
860 | /* FIXME: merge multiple requests */ | 854 | * the effort additional complexity. */ |
861 | /* FIXME: remember somewhere that we already requested the element, | ||
862 | * so that we don't request it again with the next ibf if decoding fails */ | ||
863 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), | 855 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), |
864 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); | 856 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); |
865 | 857 | ||
866 | *(struct IBF_Key *) &msg[1] = key; | 858 | *(struct IBF_Key *) &msg[1] = key; |
867 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); | 859 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); |
868 | GNUNET_MQ_send (eo->mq, ev); | 860 | GNUNET_MQ_send (op->mq, ev); |
869 | } | 861 | } |
870 | else | 862 | else |
871 | { | 863 | { |
@@ -885,32 +877,32 @@ decode_and_send (struct OperationState *eo) | |||
885 | static void | 877 | static void |
886 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | 878 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) |
887 | { | 879 | { |
888 | struct OperationState *eo = cls; | 880 | struct Operation *op = cls; |
889 | struct IBFMessage *msg = (struct IBFMessage *) mh; | 881 | struct IBFMessage *msg = (struct IBFMessage *) mh; |
890 | unsigned int buckets_in_message; | 882 | unsigned int buckets_in_message; |
891 | 883 | ||
892 | if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || | 884 | if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || |
893 | (eo->phase == PHASE_EXPECT_IBF) ) | 885 | (op->state->phase == PHASE_EXPECT_IBF) ) |
894 | { | 886 | { |
895 | eo->phase = PHASE_EXPECT_IBF_CONT; | 887 | op->state->phase = PHASE_EXPECT_IBF_CONT; |
896 | GNUNET_assert (NULL == eo->remote_ibf); | 888 | GNUNET_assert (NULL == op->state->remote_ibf); |
897 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); | 889 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); |
898 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | 890 | op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); |
899 | eo->ibf_buckets_received = 0; | 891 | op->state->ibf_buckets_received = 0; |
900 | if (0 != ntohs (msg->offset)) | 892 | if (0 != ntohs (msg->offset)) |
901 | { | 893 | { |
902 | GNUNET_break (0); | 894 | GNUNET_break (0); |
903 | fail_union_operation (eo); | 895 | fail_union_operation (op); |
904 | return; | 896 | return; |
905 | } | 897 | } |
906 | } | 898 | } |
907 | else if (eo->phase == PHASE_EXPECT_IBF_CONT) | 899 | else if (op->state->phase == PHASE_EXPECT_IBF_CONT) |
908 | { | 900 | { |
909 | if ( (ntohs (msg->offset) != eo->ibf_buckets_received) || | 901 | if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) || |
910 | (1<<msg->order != eo->remote_ibf->size) ) | 902 | (1<<msg->order != op->state->remote_ibf->size) ) |
911 | { | 903 | { |
912 | GNUNET_break (0); | 904 | GNUNET_break (0); |
913 | fail_union_operation (eo); | 905 | fail_union_operation (op); |
914 | return; | 906 | return; |
915 | } | 907 | } |
916 | } | 908 | } |
@@ -920,25 +912,25 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
920 | if (0 == buckets_in_message) | 912 | if (0 == buckets_in_message) |
921 | { | 913 | { |
922 | GNUNET_break_op (0); | 914 | GNUNET_break_op (0); |
923 | fail_union_operation (eo); | 915 | fail_union_operation (op); |
924 | return; | 916 | return; |
925 | } | 917 | } |
926 | 918 | ||
927 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) | 919 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) |
928 | { | 920 | { |
929 | GNUNET_break (0); | 921 | GNUNET_break (0); |
930 | fail_union_operation (eo); | 922 | fail_union_operation (op); |
931 | return; | 923 | return; |
932 | } | 924 | } |
933 | 925 | ||
934 | ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); | 926 | ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf); |
935 | eo->ibf_buckets_received += buckets_in_message; | 927 | op->state->ibf_buckets_received += buckets_in_message; |
936 | 928 | ||
937 | if (eo->ibf_buckets_received == eo->remote_ibf->size) | 929 | if (op->state->ibf_buckets_received == op->state->remote_ibf->size) |
938 | { | 930 | { |
939 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); | 931 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); |
940 | eo->phase = PHASE_EXPECT_ELEMENTS; | 932 | op->state->phase = PHASE_EXPECT_ELEMENTS; |
941 | decode_and_send (eo); | 933 | decode_and_send (op); |
942 | } | 934 | } |
943 | } | 935 | } |
944 | 936 | ||
@@ -947,18 +939,18 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
947 | * Send a result message to the client indicating | 939 | * Send a result message to the client indicating |
948 | * that there is a new element. | 940 | * that there is a new element. |
949 | * | 941 | * |
950 | * @param eo union operation | 942 | * @param op union operation |
951 | * @param element element to send | 943 | * @param element element to send |
952 | */ | 944 | */ |
953 | static void | 945 | static void |
954 | send_client_element (struct OperationState *eo, | 946 | send_client_element (struct Operation *op, |
955 | struct GNUNET_SET_Element *element) | 947 | struct GNUNET_SET_Element *element) |
956 | { | 948 | { |
957 | struct GNUNET_MQ_Envelope *ev; | 949 | struct GNUNET_MQ_Envelope *ev; |
958 | struct GNUNET_SET_ResultMessage *rm; | 950 | struct GNUNET_SET_ResultMessage *rm; |
959 | 951 | ||
960 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); | 952 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); |
961 | GNUNET_assert (0 != eo->spec->client_request_id); | 953 | GNUNET_assert (0 != op->spec->client_request_id); |
962 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | 954 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
963 | if (NULL == ev) | 955 | if (NULL == ev) |
964 | { | 956 | { |
@@ -967,38 +959,112 @@ send_client_element (struct OperationState *eo, | |||
967 | return; | 959 | return; |
968 | } | 960 | } |
969 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 961 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
970 | rm->request_id = htonl (eo->spec->client_request_id); | 962 | rm->request_id = htonl (op->spec->client_request_id); |
971 | rm->element_type = element->type; | 963 | rm->element_type = element->type; |
972 | memcpy (&rm[1], element->data, element->size); | 964 | memcpy (&rm[1], element->data, element->size); |
973 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 965 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
974 | } | 966 | } |
975 | 967 | ||
976 | 968 | ||
977 | /** | 969 | /** |
978 | * Send a result message to the client indicating | 970 | * Signal to the client that the operation has finished and |
979 | * that the operation is over. | 971 | * destroy the operation. |
980 | * After the result done message has been sent to the client, | ||
981 | * destroy the evaluate operation. | ||
982 | * | 972 | * |
983 | * @param eo union operation | 973 | * @param cls operation to destroy |
984 | */ | 974 | */ |
985 | static void | 975 | static void |
986 | send_client_done_and_destroy (struct OperationState *eo) | 976 | send_done_and_destroy (void *cls) |
987 | { | 977 | { |
978 | struct Operation *op = cls; | ||
988 | struct GNUNET_MQ_Envelope *ev; | 979 | struct GNUNET_MQ_Envelope *ev; |
989 | struct GNUNET_SET_ResultMessage *rm; | 980 | struct GNUNET_SET_ResultMessage *rm; |
990 | |||
991 | GNUNET_assert (GNUNET_NO == eo->client_done_sent); | ||
992 | |||
993 | eo->client_done_sent = GNUNET_YES; | ||
994 | |||
995 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 981 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
996 | rm->request_id = htonl (eo->spec->client_request_id); | 982 | rm->request_id = htonl (op->spec->client_request_id); |
997 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 983 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
998 | rm->element_type = htons (0); | 984 | rm->element_type = htons (0); |
999 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 985 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
986 | _GSS_operation_destroy (op); | ||
987 | } | ||
988 | |||
989 | |||
990 | /** | ||
991 | * Send all remaining elements in the full result iterator. | ||
992 | * | ||
993 | * @param cls operation | ||
994 | */ | ||
995 | static void | ||
996 | send_remaining_elements (void *cls) | ||
997 | { | ||
998 | struct Operation *op = cls; | ||
999 | struct KeyEntry *ke; | ||
1000 | int res; | ||
1001 | |||
1002 | res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke); | ||
1003 | res = GNUNET_NO; | ||
1004 | if (GNUNET_NO == res) | ||
1005 | { | ||
1006 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n"); | ||
1007 | send_done_and_destroy (op); | ||
1008 | return; | ||
1009 | } | ||
1010 | |||
1011 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n"); | ||
1012 | |||
1013 | while (1) | ||
1014 | { | ||
1015 | struct GNUNET_MQ_Envelope *ev; | ||
1016 | struct GNUNET_SET_ResultMessage *rm; | ||
1017 | struct GNUNET_SET_Element *element; | ||
1018 | element = &ke->element->element; | ||
1019 | |||
1020 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size); | ||
1021 | GNUNET_assert (0 != op->spec->client_request_id); | ||
1022 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
1023 | if (NULL == ev) | ||
1024 | { | ||
1025 | GNUNET_MQ_discard (ev); | ||
1026 | GNUNET_break (0); | ||
1027 | continue; | ||
1028 | } | ||
1029 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | ||
1030 | rm->request_id = htonl (op->spec->client_request_id); | ||
1031 | rm->element_type = element->type; | ||
1032 | memcpy (&rm[1], element->data, element->size); | ||
1033 | if (ke->next_colliding == NULL) | ||
1034 | { | ||
1035 | GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); | ||
1036 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
1037 | break; | ||
1038 | } | ||
1039 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
1040 | ke = ke->next_colliding; | ||
1041 | } | ||
1042 | } | ||
1000 | 1043 | ||
1001 | union_operation_destroy (eo); | 1044 | |
1045 | /** | ||
1046 | * Send a result message to the client indicating | ||
1047 | * that the operation is over. | ||
1048 | * After the result done message has been sent to the client, | ||
1049 | * destroy the evaluate operation. | ||
1050 | * | ||
1051 | * @param op union operation | ||
1052 | */ | ||
1053 | static void | ||
1054 | finish_and_destroy (struct Operation *op) | ||
1055 | { | ||
1056 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | ||
1057 | |||
1058 | if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) | ||
1059 | { | ||
1060 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); | ||
1061 | GNUNET_assert (NULL == op->state->full_result_iter); | ||
1062 | op->state->full_result_iter = | ||
1063 | GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element); | ||
1064 | send_remaining_elements (op); | ||
1065 | return; | ||
1066 | } | ||
1067 | send_done_and_destroy (op); | ||
1002 | } | 1068 | } |
1003 | 1069 | ||
1004 | 1070 | ||
@@ -1011,16 +1077,16 @@ send_client_done_and_destroy (struct OperationState *eo) | |||
1011 | static void | 1077 | static void |
1012 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | 1078 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) |
1013 | { | 1079 | { |
1014 | struct OperationState *eo = cls; | 1080 | struct Operation *op = cls; |
1015 | struct ElementEntry *ee; | 1081 | struct ElementEntry *ee; |
1016 | uint16_t element_size; | 1082 | uint16_t element_size; |
1017 | 1083 | ||
1018 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); | 1084 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); |
1019 | 1085 | ||
1020 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && | 1086 | if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) && |
1021 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) | 1087 | (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) |
1022 | { | 1088 | { |
1023 | fail_union_operation (eo); | 1089 | fail_union_operation (op); |
1024 | GNUNET_break (0); | 1090 | GNUNET_break (0); |
1025 | return; | 1091 | return; |
1026 | } | 1092 | } |
@@ -1032,12 +1098,17 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1032 | ee->remote = GNUNET_YES; | 1098 | ee->remote = GNUNET_YES; |
1033 | GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); | 1099 | GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); |
1034 | 1100 | ||
1035 | /* FIXME: see if the element has already been inserted! */ | 1101 | if (GNUNET_YES == op_has_element (op, &ee->element_hash)) |
1102 | { | ||
1103 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n"); | ||
1104 | GNUNET_free (ee); | ||
1105 | return; | ||
1106 | } | ||
1036 | 1107 | ||
1037 | op_register_element (eo, ee); | 1108 | op_register_element (op, ee); |
1038 | /* only send results immediately if the client wants it */ | 1109 | /* only send results immediately if the client wants it */ |
1039 | if (GNUNET_SET_RESULT_ADDED == eo->spec->result_mode) | 1110 | if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode) |
1040 | send_client_element (eo, &ee->element); | 1111 | send_client_element (op, &ee->element); |
1041 | } | 1112 | } |
1042 | 1113 | ||
1043 | 1114 | ||
@@ -1050,15 +1121,15 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1050 | static void | 1121 | static void |
1051 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | 1122 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) |
1052 | { | 1123 | { |
1053 | struct OperationState *eo = cls; | 1124 | struct Operation *op = cls; |
1054 | struct IBF_Key *ibf_key; | 1125 | struct IBF_Key *ibf_key; |
1055 | unsigned int num_keys; | 1126 | unsigned int num_keys; |
1056 | 1127 | ||
1057 | /* look up elements and send them */ | 1128 | /* look up elements and send them */ |
1058 | if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1129 | if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
1059 | { | 1130 | { |
1060 | GNUNET_break (0); | 1131 | GNUNET_break (0); |
1061 | fail_union_operation (eo); | 1132 | fail_union_operation (op); |
1062 | return; | 1133 | return; |
1063 | } | 1134 | } |
1064 | 1135 | ||
@@ -1067,14 +1138,14 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1067 | if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) | 1138 | if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) |
1068 | { | 1139 | { |
1069 | GNUNET_break (0); | 1140 | GNUNET_break (0); |
1070 | fail_union_operation (eo); | 1141 | fail_union_operation (op); |
1071 | return; | 1142 | return; |
1072 | } | 1143 | } |
1073 | 1144 | ||
1074 | ibf_key = (struct IBF_Key *) &mh[1]; | 1145 | ibf_key = (struct IBF_Key *) &mh[1]; |
1075 | while (0 != num_keys--) | 1146 | while (0 != num_keys--) |
1076 | { | 1147 | { |
1077 | send_elements_for_key (eo, *ibf_key); | 1148 | send_elements_for_key (op, *ibf_key); |
1078 | ibf_key++; | 1149 | ibf_key++; |
1079 | } | 1150 | } |
1080 | } | 1151 | } |
@@ -1089,28 +1160,28 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1089 | static void | 1160 | static void |
1090 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | 1161 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) |
1091 | { | 1162 | { |
1092 | struct OperationState *eo = cls; | 1163 | struct Operation *op = cls; |
1093 | struct GNUNET_MQ_Envelope *ev; | 1164 | struct GNUNET_MQ_Envelope *ev; |
1094 | 1165 | ||
1095 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1166 | if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
1096 | { | 1167 | { |
1097 | /* we got all requests, but still have to send our elements as response */ | 1168 | /* we got all requests, but still have to send our elements as response */ |
1098 | 1169 | ||
1099 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); | 1170 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); |
1100 | eo->phase = PHASE_FINISHED; | 1171 | op->state->phase = PHASE_FINISHED; |
1101 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 1172 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
1102 | GNUNET_MQ_send (eo->mq, ev); | 1173 | GNUNET_MQ_send (op->mq, ev); |
1103 | return; | 1174 | return; |
1104 | } | 1175 | } |
1105 | if (eo->phase == PHASE_EXPECT_ELEMENTS) | 1176 | if (op->state->phase == PHASE_EXPECT_ELEMENTS) |
1106 | { | 1177 | { |
1107 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); | 1178 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); |
1108 | eo->phase = PHASE_FINISHED; | 1179 | op->state->phase = PHASE_FINISHED; |
1109 | send_client_done_and_destroy (eo); | 1180 | finish_and_destroy (op); |
1110 | return; | 1181 | return; |
1111 | } | 1182 | } |
1112 | GNUNET_break (0); | 1183 | GNUNET_break (0); |
1113 | fail_union_operation (eo); | 1184 | fail_union_operation (op); |
1114 | } | 1185 | } |
1115 | 1186 | ||
1116 | 1187 | ||
@@ -1118,78 +1189,34 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1118 | * Evaluate a union operation with | 1189 | * Evaluate a union operation with |
1119 | * a remote peer. | 1190 | * a remote peer. |
1120 | * | 1191 | * |
1121 | * @param spec specification of the operation the evaluate | 1192 | * @param op operation to evaluate |
1122 | * @param tunnel tunnel already connected to the partner peer | ||
1123 | * @param tc tunnel context, passed here so all new incoming | ||
1124 | * messages are directly going to the union operations | ||
1125 | * @return a handle to the operation | ||
1126 | */ | 1193 | */ |
1127 | static void | 1194 | static void |
1128 | union_evaluate (struct OperationSpecification *spec, | 1195 | union_evaluate (struct Operation *op) |
1129 | struct GNUNET_MESH_Tunnel *tunnel, | ||
1130 | struct TunnelContext *tc) | ||
1131 | { | 1196 | { |
1132 | struct OperationState *eo; | 1197 | op->state = GNUNET_new (struct OperationState); |
1133 | 1198 | op->state->se = strata_estimator_dup (op->spec->set->state->se); | |
1134 | eo = GNUNET_new (struct OperationState); | ||
1135 | tc->vt = _GSS_union_vt (); | ||
1136 | tc->op = eo; | ||
1137 | eo->se = strata_estimator_dup (spec->set->state->se); | ||
1138 | eo->generation_created = spec->set->current_generation++; | ||
1139 | eo->set = spec->set; | ||
1140 | eo->spec = spec; | ||
1141 | eo->tunnel = tunnel; | ||
1142 | eo->tunnel = tunnel; | ||
1143 | eo->mq = GNUNET_MESH_mq_create (tunnel); | ||
1144 | |||
1145 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1146 | "evaluating union operation, (app %s)\n", | ||
1147 | GNUNET_h2s (&eo->spec->app_id)); | ||
1148 | |||
1149 | /* we started the operation, thus we have to send the operation request */ | 1199 | /* we started the operation, thus we have to send the operation request */ |
1150 | eo->phase = PHASE_EXPECT_SE; | 1200 | op->state->phase = PHASE_EXPECT_SE; |
1151 | 1201 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation"); | |
1152 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, | 1202 | send_operation_request (op); |
1153 | eo->set->state->ops_tail, | ||
1154 | eo); | ||
1155 | |||
1156 | send_operation_request (eo); | ||
1157 | } | 1203 | } |
1158 | 1204 | ||
1159 | 1205 | ||
1160 | /** | 1206 | /** |
1161 | * Accept an union operation request from a remote peer | 1207 | * Accept an union operation request from a remote peer. |
1208 | * Only initializes the private operation state. | ||
1162 | * | 1209 | * |
1163 | * @param spec all necessary information about the operation | 1210 | * @param op operation that will be accepted as a union operation |
1164 | * @param tunnel open tunnel to the partner's peer | ||
1165 | * @param tc tunnel context, passed here so all new incoming | ||
1166 | * messages are directly going to the union operations | ||
1167 | * @return operation | ||
1168 | */ | 1211 | */ |
1169 | static void | 1212 | static void |
1170 | union_accept (struct OperationSpecification *spec, | 1213 | union_accept (struct Operation *op) |
1171 | struct GNUNET_MESH_Tunnel *tunnel, | ||
1172 | struct TunnelContext *tc) | ||
1173 | { | 1214 | { |
1174 | struct OperationState *eo; | ||
1175 | |||
1176 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); | 1215 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); |
1177 | 1216 | op->state = GNUNET_new (struct OperationState); | |
1178 | eo = GNUNET_new (struct OperationState); | 1217 | op->state->se = strata_estimator_dup (op->spec->set->state->se); |
1179 | tc->vt = _GSS_union_vt (); | ||
1180 | tc->op = eo; | ||
1181 | eo->set = spec->set; | ||
1182 | eo->generation_created = eo->set->current_generation++; | ||
1183 | eo->spec = spec; | ||
1184 | eo->tunnel = tunnel; | ||
1185 | eo->mq = GNUNET_MESH_mq_create (tunnel); | ||
1186 | eo->se = strata_estimator_dup (eo->set->state->se); | ||
1187 | /* transfer ownership of mq and socket from incoming to eo */ | ||
1188 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, | ||
1189 | eo->set->state->ops_tail, | ||
1190 | eo); | ||
1191 | /* kick off the operation */ | 1218 | /* kick off the operation */ |
1192 | send_strata_estimator (eo); | 1219 | send_strata_estimator (op); |
1193 | } | 1220 | } |
1194 | 1221 | ||
1195 | 1222 | ||
@@ -1240,17 +1267,13 @@ union_remove (struct SetState *set_state, struct ElementEntry *ee) | |||
1240 | 1267 | ||
1241 | 1268 | ||
1242 | /** | 1269 | /** |
1243 | * Destroy a set that supports the union operation | 1270 | * Destroy a set that supports the union operation. |
1244 | * | 1271 | * |
1245 | * @param set_state the set to destroy | 1272 | * @param set_state the set to destroy |
1246 | */ | 1273 | */ |
1247 | static void | 1274 | static void |
1248 | union_set_destroy (struct SetState *set_state) | 1275 | union_set_destroy (struct SetState *set_state) |
1249 | { | 1276 | { |
1250 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n"); | ||
1251 | /* important to destroy operations before the rest of the set */ | ||
1252 | while (NULL != set_state->ops_head) | ||
1253 | union_operation_destroy (set_state->ops_head); | ||
1254 | if (NULL != set_state->se) | 1277 | if (NULL != set_state->se) |
1255 | { | 1278 | { |
1256 | strata_estimator_destroy (set_state->se); | 1279 | strata_estimator_destroy (set_state->se); |
@@ -1263,13 +1286,13 @@ union_set_destroy (struct SetState *set_state) | |||
1263 | /** | 1286 | /** |
1264 | * Dispatch messages for a union operation. | 1287 | * Dispatch messages for a union operation. |
1265 | * | 1288 | * |
1266 | * @param eo the state of the union evaluate operation | 1289 | * @param op the state of the union evaluate operation |
1267 | * @param mh the received message | 1290 | * @param mh the received message |
1268 | * @return GNUNET_SYSERR if the tunnel should be disconnected, | 1291 | * @return GNUNET_SYSERR if the tunnel should be disconnected, |
1269 | * GNUNET_OK otherwise | 1292 | * GNUNET_OK otherwise |
1270 | */ | 1293 | */ |
1271 | int | 1294 | int |
1272 | union_handle_p2p_message (struct OperationState *eo, | 1295 | union_handle_p2p_message (struct Operation *op, |
1273 | const struct GNUNET_MessageHeader *mh) | 1296 | const struct GNUNET_MessageHeader *mh) |
1274 | { | 1297 | { |
1275 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", | 1298 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", |
@@ -1277,19 +1300,19 @@ union_handle_p2p_message (struct OperationState *eo, | |||
1277 | switch (ntohs (mh->type)) | 1300 | switch (ntohs (mh->type)) |
1278 | { | 1301 | { |
1279 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: | 1302 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: |
1280 | handle_p2p_ibf (eo, mh); | 1303 | handle_p2p_ibf (op, mh); |
1281 | break; | 1304 | break; |
1282 | case GNUNET_MESSAGE_TYPE_SET_P2P_SE: | 1305 | case GNUNET_MESSAGE_TYPE_SET_P2P_SE: |
1283 | handle_p2p_strata_estimator (eo, mh); | 1306 | handle_p2p_strata_estimator (op, mh); |
1284 | break; | 1307 | break; |
1285 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | 1308 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: |
1286 | handle_p2p_elements (eo, mh); | 1309 | handle_p2p_elements (op, mh); |
1287 | break; | 1310 | break; |
1288 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: | 1311 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: |
1289 | handle_p2p_element_requests (eo, mh); | 1312 | handle_p2p_element_requests (op, mh); |
1290 | break; | 1313 | break; |
1291 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: | 1314 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: |
1292 | handle_p2p_done (eo, mh); | 1315 | handle_p2p_done (op, mh); |
1293 | break; | 1316 | break; |
1294 | default: | 1317 | default: |
1295 | /* something wrong with mesh's message handlers? */ | 1318 | /* something wrong with mesh's message handlers? */ |
@@ -1300,18 +1323,9 @@ union_handle_p2p_message (struct OperationState *eo, | |||
1300 | 1323 | ||
1301 | 1324 | ||
1302 | static void | 1325 | static void |
1303 | union_peer_disconnect (struct OperationState *op) | 1326 | union_peer_disconnect (struct Operation *op) |
1304 | { | 1327 | { |
1305 | /* Are we already disconnected? */ | 1328 | if (PHASE_FINISHED != op->state->phase) |
1306 | if (NULL == op->tunnel) | ||
1307 | return; | ||
1308 | op->tunnel = NULL; | ||
1309 | if (NULL != op->mq) | ||
1310 | { | ||
1311 | GNUNET_MQ_destroy (op->mq); | ||
1312 | op->mq = NULL; | ||
1313 | } | ||
1314 | if (PHASE_FINISHED != op->phase) | ||
1315 | { | 1329 | { |
1316 | struct GNUNET_MQ_Envelope *ev; | 1330 | struct GNUNET_MQ_Envelope *ev; |
1317 | struct GNUNET_SET_ResultMessage *msg; | 1331 | struct GNUNET_SET_ResultMessage *msg; |
@@ -1322,34 +1336,12 @@ union_peer_disconnect (struct OperationState *op) | |||
1322 | msg->element_type = htons (0); | 1336 | msg->element_type = htons (0); |
1323 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | 1337 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
1324 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); | 1338 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); |
1325 | union_operation_destroy (op); | 1339 | _GSS_operation_destroy (op); |
1326 | return; | 1340 | return; |
1327 | } | 1341 | } |
1328 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); | 1342 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); |
1329 | if (GNUNET_NO == op->client_done_sent) | 1343 | if (GNUNET_NO == op->state->client_done_sent) |
1330 | send_client_done_and_destroy (op); | 1344 | finish_and_destroy (op); |
1331 | } | ||
1332 | |||
1333 | |||
1334 | static void | ||
1335 | union_op_cancel (struct SetState *set_state, uint32_t op_id) | ||
1336 | { | ||
1337 | struct OperationState *op_state; | ||
1338 | int found = GNUNET_NO; | ||
1339 | for (op_state = set_state->ops_head; NULL != op_state; op_state = op_state->next) | ||
1340 | { | ||
1341 | if (op_state->spec->client_request_id == op_id) | ||
1342 | { | ||
1343 | found = GNUNET_YES; | ||
1344 | break; | ||
1345 | } | ||
1346 | } | ||
1347 | if (GNUNET_NO == found) | ||
1348 | { | ||
1349 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "canceling non-existing operation\n"); | ||
1350 | return; | ||
1351 | } | ||
1352 | union_operation_destroy (op_state); | ||
1353 | } | 1345 | } |
1354 | 1346 | ||
1355 | 1347 | ||
diff --git a/src/set/ibf.h b/src/set/ibf.h index 407d14f64..c62ecac43 100644 --- a/src/set/ibf.h +++ b/src/set/ibf.h | |||
@@ -39,27 +39,40 @@ extern "C" | |||
39 | #endif | 39 | #endif |
40 | 40 | ||
41 | 41 | ||
42 | /** | ||
43 | * Keys that can be inserted into and removed from an IBF. | ||
44 | */ | ||
42 | struct IBF_Key | 45 | struct IBF_Key |
43 | { | 46 | { |
44 | uint64_t key_val; | 47 | uint64_t key_val; |
45 | }; | 48 | }; |
46 | 49 | ||
50 | |||
51 | /** | ||
52 | * Hash of an IBF key. | ||
53 | */ | ||
47 | struct IBF_KeyHash | 54 | struct IBF_KeyHash |
48 | { | 55 | { |
49 | uint32_t key_hash_val; | 56 | uint32_t key_hash_val; |
50 | }; | 57 | }; |
51 | 58 | ||
59 | |||
60 | /** | ||
61 | * Type of the count field of IBF buckets. | ||
62 | */ | ||
52 | struct IBF_Count | 63 | struct IBF_Count |
53 | { | 64 | { |
54 | int8_t count_val; | 65 | int8_t count_val; |
55 | }; | 66 | }; |
56 | 67 | ||
68 | |||
57 | /** | 69 | /** |
58 | * Size of one ibf bucket in bytes | 70 | * Size of one ibf bucket in bytes |
59 | */ | 71 | */ |
60 | #define IBF_BUCKET_SIZE (sizeof (struct IBF_Count) + sizeof (struct IBF_Key) + \ | 72 | #define IBF_BUCKET_SIZE (sizeof (struct IBF_Count) + sizeof (struct IBF_Key) + \ |
61 | sizeof (struct IBF_KeyHash)) | 73 | sizeof (struct IBF_KeyHash)) |
62 | 74 | ||
75 | |||
63 | /** | 76 | /** |
64 | * Invertible bloom filter (IBF). | 77 | * Invertible bloom filter (IBF). |
65 | * | 78 | * |
@@ -212,6 +225,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *re | |||
212 | struct InvertibleBloomFilter * | 225 | struct InvertibleBloomFilter * |
213 | ibf_dup (const struct InvertibleBloomFilter *ibf); | 226 | ibf_dup (const struct InvertibleBloomFilter *ibf); |
214 | 227 | ||
228 | |||
215 | /** | 229 | /** |
216 | * Destroy all resources associated with the invertible bloom filter. | 230 | * Destroy all resources associated with the invertible bloom filter. |
217 | * No more ibf_*-functions may be called on ibf after calling destroy. | 231 | * No more ibf_*-functions may be called on ibf after calling destroy. |
diff --git a/src/set/strata_estimator.h b/src/set/strata_estimator.h index abc39f25a..9de5598bd 100644 --- a/src/set/strata_estimator.h +++ b/src/set/strata_estimator.h | |||
@@ -40,6 +40,9 @@ extern "C" | |||
40 | #endif | 40 | #endif |
41 | 41 | ||
42 | 42 | ||
43 | /** | ||
44 | * A handle to a strata estimator. | ||
45 | */ | ||
43 | struct StrataEstimator | 46 | struct StrataEstimator |
44 | { | 47 | { |
45 | struct InvertibleBloomFilter **strata; | 48 | struct InvertibleBloomFilter **strata; |
@@ -48,31 +51,77 @@ struct StrataEstimator | |||
48 | }; | 51 | }; |
49 | 52 | ||
50 | 53 | ||
54 | /** | ||
55 | * Write the given strata estimator to the buffer. | ||
56 | * | ||
57 | * @param se strata estimator to serialize | ||
58 | * @param buf buffer to write to, must be of appropriate size | ||
59 | */ | ||
51 | void | 60 | void |
52 | strata_estimator_write (const struct StrataEstimator *se, void *buf); | 61 | strata_estimator_write (const struct StrataEstimator *se, void *buf); |
53 | 62 | ||
54 | 63 | ||
64 | /** | ||
65 | * Read strata from the buffer into the given strata | ||
66 | * estimator. The strata estimator must already be allocated. | ||
67 | * | ||
68 | * @param buf buffer to read from | ||
69 | * @param se strata estimator to write to | ||
70 | */ | ||
55 | void | 71 | void |
56 | strata_estimator_read (const void *buf, struct StrataEstimator *se); | 72 | strata_estimator_read (const void *buf, struct StrataEstimator *se); |
57 | 73 | ||
58 | 74 | ||
75 | /** | ||
76 | * Create a new strata estimator with the given parameters. | ||
77 | * | ||
78 | * @param strata_count number of stratas, that is, number of ibfs in the estimator | ||
79 | * @param ibf_size size of each ibf stratum | ||
80 | * @param ibf_hashnum hashnum parameter of each ibf | ||
81 | * @return a freshly allocated, empty strata estimator | ||
82 | */ | ||
59 | struct StrataEstimator * | 83 | struct StrataEstimator * |
60 | strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum); | 84 | strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum); |
61 | 85 | ||
62 | 86 | ||
87 | /** | ||
88 | * Get an estimation of the symmetric difference of the elements | ||
89 | * contained in both strata estimators. | ||
90 | * | ||
91 | * @param se1 first strata estimator | ||
92 | * @param se2 second strata estimator | ||
93 | * @return abs(|se1| - |se2|) | ||
94 | */ | ||
63 | unsigned int | 95 | unsigned int |
64 | strata_estimator_difference (const struct StrataEstimator *se1, | 96 | strata_estimator_difference (const struct StrataEstimator *se1, |
65 | const struct StrataEstimator *se2); | 97 | const struct StrataEstimator *se2); |
66 | 98 | ||
67 | 99 | ||
100 | /** | ||
101 | * Add a key to the strata estimator. | ||
102 | * | ||
103 | * @param se strata estimator to add the key to | ||
104 | * @param key key to add | ||
105 | */ | ||
68 | void | 106 | void |
69 | strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key); | 107 | strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key); |
70 | 108 | ||
71 | 109 | ||
110 | /** | ||
111 | * Remove a key from the strata estimator. | ||
112 | * | ||
113 | * @param se strata estimator to remove the key from | ||
114 | * @param key key to remove | ||
115 | */ | ||
72 | void | 116 | void |
73 | strata_estimator_remove (struct StrataEstimator *se, struct IBF_Key key); | 117 | strata_estimator_remove (struct StrataEstimator *se, struct IBF_Key key); |
74 | 118 | ||
75 | 119 | ||
120 | /** | ||
121 | * Destroy a strata estimator, free all of its resources. | ||
122 | * | ||
123 | * @param se strata estimator to destroy. | ||
124 | */ | ||
76 | void | 125 | void |
77 | strata_estimator_destroy (struct StrataEstimator *se); | 126 | strata_estimator_destroy (struct StrataEstimator *se); |
78 | 127 | ||
diff --git a/src/set/test_set.conf b/src/set/test_set.conf index 0d8ef692d..d59c425c4 100644 --- a/src/set/test_set.conf +++ b/src/set/test_set.conf | |||
@@ -8,6 +8,7 @@ PORT = 2106 | |||
8 | HOSTNAME = localhost | 8 | HOSTNAME = localhost |
9 | BINARY = gnunet-service-set | 9 | BINARY = gnunet-service-set |
10 | #PREFIX = valgrind | 10 | #PREFIX = valgrind |
11 | #PREFIX = valgrind -v --leak-check=full | ||
11 | #PREFIX = gdbserver :1234 | 12 | #PREFIX = gdbserver :1234 |
12 | ACCEPT_FROM = 127.0.0.1; | 13 | ACCEPT_FROM = 127.0.0.1; |
13 | ACCEPT_FROM6 = ::1; | 14 | ACCEPT_FROM6 = ::1; |
diff --git a/src/set/test_set_union_result_full.c b/src/set/test_set_union_result_full.c new file mode 100644 index 000000000..e54332b8b --- /dev/null +++ b/src/set/test_set_union_result_full.c | |||
@@ -0,0 +1,255 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file set/test_set_union_result_full.c | ||
23 | * @brief testcase for full result mode of the union set operation | ||
24 | */ | ||
25 | #include "platform.h" | ||
26 | #include "gnunet_util_lib.h" | ||
27 | #include "gnunet_testing_lib.h" | ||
28 | #include "gnunet_set_service.h" | ||
29 | |||
30 | |||
31 | static struct GNUNET_PeerIdentity local_id; | ||
32 | |||
33 | static struct GNUNET_HashCode app_id; | ||
34 | static struct GNUNET_SET_Handle *set1; | ||
35 | static struct GNUNET_SET_Handle *set2; | ||
36 | static struct GNUNET_SET_ListenHandle *listen_handle; | ||
37 | const static struct GNUNET_CONFIGURATION_Handle *config; | ||
38 | |||
39 | static int iter_count; | ||
40 | |||
41 | |||
42 | static void | ||
43 | result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element, | ||
44 | enum GNUNET_SET_Status status) | ||
45 | { | ||
46 | switch (status) | ||
47 | { | ||
48 | case GNUNET_SET_STATUS_OK: | ||
49 | printf ("set 1: got element\n"); | ||
50 | break; | ||
51 | case GNUNET_SET_STATUS_FAILURE: | ||
52 | printf ("set 1: failure\n"); | ||
53 | break; | ||
54 | case GNUNET_SET_STATUS_DONE: | ||
55 | printf ("set 1: done\n"); | ||
56 | GNUNET_SET_destroy (set1); | ||
57 | break; | ||
58 | default: | ||
59 | GNUNET_assert (0); | ||
60 | } | ||
61 | } | ||
62 | |||
63 | |||
64 | static void | ||
65 | result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element, | ||
66 | enum GNUNET_SET_Status status) | ||
67 | { | ||
68 | switch (status) | ||
69 | { | ||
70 | case GNUNET_SET_STATUS_OK: | ||
71 | printf ("set 2: got element\n"); | ||
72 | break; | ||
73 | case GNUNET_SET_STATUS_FAILURE: | ||
74 | printf ("set 2: failure\n"); | ||
75 | break; | ||
76 | case GNUNET_SET_STATUS_DONE: | ||
77 | printf ("set 2: done\n"); | ||
78 | GNUNET_SET_destroy (set2); | ||
79 | break; | ||
80 | default: | ||
81 | GNUNET_assert (0); | ||
82 | } | ||
83 | } | ||
84 | |||
85 | |||
86 | static void | ||
87 | listen_cb (void *cls, | ||
88 | const struct GNUNET_PeerIdentity *other_peer, | ||
89 | const struct GNUNET_MessageHeader *context_msg, | ||
90 | struct GNUNET_SET_Request *request) | ||
91 | { | ||
92 | struct GNUNET_SET_OperationHandle *oh; | ||
93 | |||
94 | GNUNET_assert (NULL != context_msg); | ||
95 | |||
96 | GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_TEST); | ||
97 | |||
98 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); | ||
99 | GNUNET_SET_listen_cancel (listen_handle); | ||
100 | |||
101 | oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_FULL, result_cb_set2, NULL); | ||
102 | GNUNET_SET_commit (oh, set2); | ||
103 | } | ||
104 | |||
105 | |||
106 | /** | ||
107 | * Start the set operation. | ||
108 | * | ||
109 | * @param cls closure, unused | ||
110 | */ | ||
111 | static void | ||
112 | start (void *cls) | ||
113 | { | ||
114 | struct GNUNET_SET_OperationHandle *oh; | ||
115 | struct GNUNET_MessageHeader context_msg; | ||
116 | |||
117 | context_msg.size = htons (sizeof context_msg); | ||
118 | context_msg.type = htons (GNUNET_MESSAGE_TYPE_TEST); | ||
119 | |||
120 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | ||
121 | &app_id, listen_cb, NULL); | ||
122 | oh = GNUNET_SET_prepare (&local_id, &app_id, &context_msg, 42, | ||
123 | GNUNET_SET_RESULT_FULL, | ||
124 | result_cb_set1, NULL); | ||
125 | GNUNET_SET_commit (oh, set1); | ||
126 | } | ||
127 | |||
128 | |||
129 | /** | ||
130 | * Initialize the second set, continue | ||
131 | * | ||
132 | * @param cls closure, unused | ||
133 | */ | ||
134 | static void | ||
135 | init_set2 (void *cls) | ||
136 | { | ||
137 | struct GNUNET_SET_Element element; | ||
138 | |||
139 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); | ||
140 | |||
141 | element.type = 0; | ||
142 | |||
143 | element.data = "hello"; | ||
144 | element.size = strlen(element.data); | ||
145 | GNUNET_SET_add_element (set2, &element, NULL, NULL); | ||
146 | element.data = "quux"; | ||
147 | element.size = strlen(element.data); | ||
148 | GNUNET_SET_add_element (set2, &element, NULL, NULL); | ||
149 | element.data = "baz"; | ||
150 | element.size = strlen(element.data); | ||
151 | GNUNET_SET_add_element (set2, &element, start, NULL); | ||
152 | } | ||
153 | |||
154 | |||
155 | /** | ||
156 | * Initialize the first set, continue. | ||
157 | */ | ||
158 | static void | ||
159 | init_set1 (void) | ||
160 | { | ||
161 | struct GNUNET_SET_Element element; | ||
162 | |||
163 | element.type = 0; | ||
164 | |||
165 | element.data = "hello"; | ||
166 | element.size = strlen(element.data); | ||
167 | GNUNET_SET_add_element (set1, &element, NULL, NULL); | ||
168 | element.data = "bar"; | ||
169 | element.size = strlen(element.data); | ||
170 | GNUNET_SET_add_element (set1, &element, init_set2, NULL); | ||
171 | |||
172 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); | ||
173 | } | ||
174 | |||
175 | |||
176 | static int | ||
177 | iter_cb (void *cls, | ||
178 | const struct GNUNET_SET_Element *element) | ||
179 | { | ||
180 | if (NULL == element) | ||
181 | { | ||
182 | GNUNET_assert (iter_count == 3); | ||
183 | GNUNET_SET_destroy (cls); | ||
184 | return GNUNET_YES; | ||
185 | } | ||
186 | printf ("iter: got element\n"); | ||
187 | iter_count++; | ||
188 | return GNUNET_YES; | ||
189 | } | ||
190 | |||
191 | |||
192 | static void | ||
193 | test_iter () | ||
194 | { | ||
195 | struct GNUNET_SET_Element element; | ||
196 | struct GNUNET_SET_Handle *iter_set; | ||
197 | |||
198 | iter_set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); | ||
199 | |||
200 | element.type = 0; | ||
201 | |||
202 | element.data = "hello"; | ||
203 | element.size = strlen(element.data); | ||
204 | GNUNET_SET_add_element (iter_set, &element, NULL, NULL); | ||
205 | element.data = "bar"; | ||
206 | element.size = strlen(element.data); | ||
207 | GNUNET_SET_add_element (iter_set, &element, NULL, NULL); | ||
208 | element.data = "quux"; | ||
209 | element.size = strlen(element.data); | ||
210 | GNUNET_SET_add_element (iter_set, &element, NULL, NULL); | ||
211 | |||
212 | GNUNET_SET_iterate (iter_set, iter_cb, iter_set); | ||
213 | } | ||
214 | |||
215 | |||
216 | /** | ||
217 | * Signature of the 'main' function for a (single-peer) testcase that | ||
218 | * is run using 'GNUNET_TESTING_peer_run'. | ||
219 | * | ||
220 | * @param cls closure | ||
221 | * @param cfg configuration of the peer that was started | ||
222 | * @param peer identity of the peer that was created | ||
223 | */ | ||
224 | static void | ||
225 | run (void *cls, | ||
226 | const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
227 | struct GNUNET_TESTING_Peer *peer) | ||
228 | { | ||
229 | config = cfg; | ||
230 | GNUNET_CRYPTO_get_peer_identity (cfg, &local_id); | ||
231 | printf ("my id (from CRYPTO): %s\n", GNUNET_i2s (&local_id)); | ||
232 | GNUNET_TESTING_peer_get_identity (peer, &local_id); | ||
233 | printf ("my id (from TESTING): %s\n", GNUNET_i2s (&local_id)); | ||
234 | |||
235 | test_iter (); | ||
236 | |||
237 | set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
238 | set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
239 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); | ||
240 | |||
241 | /* test the real set reconciliation */ | ||
242 | init_set1 (); | ||
243 | } | ||
244 | |||
245 | int | ||
246 | main (int argc, char **argv) | ||
247 | { | ||
248 | int ret; | ||
249 | |||
250 | ret = GNUNET_TESTING_peer_run ("test_set_api", | ||
251 | "test_set.conf", | ||
252 | &run, NULL); | ||
253 | return ret; | ||
254 | } | ||
255 | |||
diff --git a/src/util/container_multihashmap32.c b/src/util/container_multihashmap32.c index afb0e3653..365333de2 100644 --- a/src/util/container_multihashmap32.c +++ b/src/util/container_multihashmap32.c | |||
@@ -73,6 +73,41 @@ struct GNUNET_CONTAINER_MultiHashMap32 | |||
73 | * Length of the "map" array. | 73 | * Length of the "map" array. |
74 | */ | 74 | */ |
75 | unsigned int map_length; | 75 | unsigned int map_length; |
76 | |||
77 | /** | ||
78 | * Counts the destructive modifications (grow, remove) | ||
79 | * to the map, so that iterators can check if they are still valid. | ||
80 | */ | ||
81 | unsigned int modification_counter; | ||
82 | }; | ||
83 | |||
84 | |||
85 | /** | ||
86 | * Cursor into a multihashmap. | ||
87 | * Allows to enumerate elements asynchronously. | ||
88 | */ | ||
89 | struct GNUNET_CONTAINER_MultiHashMap32Iterator | ||
90 | { | ||
91 | /** | ||
92 | * Position in the bucket 'idx' | ||
93 | */ | ||
94 | struct MapEntry *me; | ||
95 | |||
96 | /** | ||
97 | * Current bucket index. | ||
98 | */ | ||
99 | unsigned int idx; | ||
100 | |||
101 | /** | ||
102 | * Modification counter as observed on the map when the iterator | ||
103 | * was created. | ||
104 | */ | ||
105 | unsigned int modification_counter; | ||
106 | |||
107 | /** | ||
108 | * Map that we are iterating over. | ||
109 | */ | ||
110 | const struct GNUNET_CONTAINER_MultiHashMap32 *map; | ||
76 | }; | 111 | }; |
77 | 112 | ||
78 | 113 | ||
@@ -239,6 +274,8 @@ GNUNET_CONTAINER_multihashmap32_remove (struct GNUNET_CONTAINER_MultiHashMap32 | |||
239 | struct MapEntry *p; | 274 | struct MapEntry *p; |
240 | unsigned int i; | 275 | unsigned int i; |
241 | 276 | ||
277 | map->modification_counter++; | ||
278 | |||
242 | i = idx_of (map, key); | 279 | i = idx_of (map, key); |
243 | p = NULL; | 280 | p = NULL; |
244 | e = map->map[i]; | 281 | e = map->map[i]; |
@@ -280,6 +317,8 @@ GNUNET_CONTAINER_multihashmap32_remove_all (struct | |||
280 | unsigned int i; | 317 | unsigned int i; |
281 | int ret; | 318 | int ret; |
282 | 319 | ||
320 | map->modification_counter++; | ||
321 | |||
283 | ret = 0; | 322 | ret = 0; |
284 | i = idx_of (map, key); | 323 | i = idx_of (map, key); |
285 | p = NULL; | 324 | p = NULL; |
@@ -383,6 +422,8 @@ grow (struct GNUNET_CONTAINER_MultiHashMap32 *map) | |||
383 | unsigned int idx; | 422 | unsigned int idx; |
384 | unsigned int i; | 423 | unsigned int i; |
385 | 424 | ||
425 | map->modification_counter++; | ||
426 | |||
386 | old_map = map->map; | 427 | old_map = map->map; |
387 | old_len = map->map_length; | 428 | old_len = map->map_length; |
388 | new_len = old_len * 2; | 429 | new_len = old_len * 2; |
@@ -492,4 +533,84 @@ GNUNET_CONTAINER_multihashmap32_get_multiple (const struct | |||
492 | } | 533 | } |
493 | 534 | ||
494 | 535 | ||
536 | /** | ||
537 | * Create an iterator for a multihashmap. | ||
538 | * The iterator can be used to retrieve all the elements in the multihashmap | ||
539 | * one by one, without having to handle all elements at once (in contrast to | ||
540 | * GNUNET_CONTAINER_multihashmap_iterate()). Note that the iterator can not be | ||
541 | * used anymore if elements have been removed from 'map' after the creation of | ||
542 | * the iterator, or 'map' has been destroyed. Adding elements to 'map' may | ||
543 | * result in skipped or repeated elements. | ||
544 | * | ||
545 | * @param map the map to create an iterator for | ||
546 | * @return an iterator over the given multihashmap 'map' | ||
547 | */ | ||
548 | struct GNUNET_CONTAINER_MultiHashMap32Iterator * | ||
549 | GNUNET_CONTAINER_multihashmap32_iterator_create (const struct GNUNET_CONTAINER_MultiHashMap32 *map) | ||
550 | { | ||
551 | struct GNUNET_CONTAINER_MultiHashMap32Iterator *iter; | ||
552 | |||
553 | iter = GNUNET_new (struct GNUNET_CONTAINER_MultiHashMap32Iterator); | ||
554 | iter->map = map; | ||
555 | iter->modification_counter = map->modification_counter; | ||
556 | iter->me = map->map[0]; | ||
557 | return iter; | ||
558 | } | ||
559 | |||
560 | |||
561 | /** | ||
562 | * Retrieve the next element from the hash map at the iterator's position. | ||
563 | * If there are no elements left, GNUNET_NO is returned, and 'key' and 'value' | ||
564 | * are not modified. | ||
565 | * This operation is only allowed if no elements have been removed from the | ||
566 | * multihashmap since the creation of 'iter', and the map has not been destroyed. | ||
567 | * Adding elements may result in repeating or skipping elements. | ||
568 | * | ||
569 | * @param iter the iterator to get the next element from | ||
570 | * @param key pointer to store the key in, can be NULL | ||
571 | * @param value pointer to store the value in, can be NULL | ||
572 | * @return #GNUNET_YES we returned an element, | ||
573 | * #GNUNET_NO if we are out of elements | ||
574 | */ | ||
575 | int | ||
576 | GNUNET_CONTAINER_multihashmap32_iterator_next (struct GNUNET_CONTAINER_MultiHashMap32Iterator *iter, | ||
577 | uint32_t *key, | ||
578 | const void **value) | ||
579 | { | ||
580 | /* make sure the map has not been modified */ | ||
581 | GNUNET_assert (iter->modification_counter == iter->map->modification_counter); | ||
582 | |||
583 | /* look for the next entry, skipping empty buckets */ | ||
584 | while (1) | ||
585 | { | ||
586 | if (iter->idx >= iter->map->map_length) | ||
587 | return GNUNET_NO; | ||
588 | if (NULL != iter->me) | ||
589 | { | ||
590 | if (NULL != key) | ||
591 | *key = iter->me->key; | ||
592 | if (NULL != value) | ||
593 | *value = iter->me->value; | ||
594 | iter->me = iter->me->next; | ||
595 | return GNUNET_YES; | ||
596 | } | ||
597 | iter->idx += 1; | ||
598 | if (iter->idx < iter->map->map_length) | ||
599 | iter->me = iter->map->map[iter->idx]; | ||
600 | } | ||
601 | } | ||
602 | |||
603 | |||
604 | /** | ||
605 | * Destroy a multihashmap iterator. | ||
606 | * | ||
607 | * @param iter the iterator to destroy | ||
608 | */ | ||
609 | void | ||
610 | GNUNET_CONTAINER_multihashmap32_iterator_destroy (struct GNUNET_CONTAINER_MultiHashMapIterator *iter) | ||
611 | { | ||
612 | GNUNET_free (iter); | ||
613 | } | ||
614 | |||
615 | |||
495 | /* end of container_multihashmap.c */ | 616 | /* end of container_multihashmap.c */ |
diff --git a/src/util/mq.c b/src/util/mq.c index 1a3374087..d8659ec40 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -603,6 +603,7 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq, | |||
603 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), | 603 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), |
604 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | 604 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, |
605 | &connection_client_transmit_queued, mq); | 605 | &connection_client_transmit_queued, mq); |
606 | GNUNET_assert (NULL != state->th); | ||
606 | } | 607 | } |
607 | 608 | ||
608 | 609 | ||