diff options
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r-- | src/set/gnunet-service-set_union.c | 690 |
1 files changed, 341 insertions, 349 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 33a36d260..6ad985bcb 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -112,12 +112,6 @@ struct OperationState | |||
112 | struct GNUNET_MESH_Tunnel *tunnel; | 112 | struct GNUNET_MESH_Tunnel *tunnel; |
113 | 113 | ||
114 | /** | 114 | /** |
115 | * Detail information about the set operation, | ||
116 | * including the set to use. | ||
117 | */ | ||
118 | struct OperationSpecification *spec; | ||
119 | |||
120 | /** | ||
121 | * Message queue for the peer. | 115 | * Message queue for the peer. |
122 | */ | 116 | */ |
123 | struct GNUNET_MQ_Handle *mq; | 117 | struct GNUNET_MQ_Handle *mq; |
@@ -151,33 +145,14 @@ struct OperationState | |||
151 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; | 145 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; |
152 | 146 | ||
153 | /** | 147 | /** |
154 | * Current state of the operation. | 148 | * Iterator for sending elements on the key to element mapping to the client. |
155 | */ | ||
156 | enum UnionOperationPhase phase; | ||
157 | |||
158 | /** | ||
159 | * Generation in which the operation handle | ||
160 | * was created. | ||
161 | */ | ||
162 | unsigned int generation_created; | ||
163 | |||
164 | /** | ||
165 | * Set state of the set that this operation | ||
166 | * belongs to. | ||
167 | */ | 149 | */ |
168 | struct Set *set; | 150 | struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter; |
169 | 151 | ||
170 | /** | 152 | /** |
171 | * Evaluate operations are held in | 153 | * Current state of the operation. |
172 | * a linked list. | ||
173 | */ | 154 | */ |
174 | struct OperationState *next; | 155 | enum UnionOperationPhase phase; |
175 | |||
176 | /** | ||
177 | * Evaluate operations are held in | ||
178 | * a linked list. | ||
179 | */ | ||
180 | struct OperationState *prev; | ||
181 | 156 | ||
182 | /** | 157 | /** |
183 | * Did we send the client that we are done? | 158 | * Did we send the client that we are done? |
@@ -198,13 +173,13 @@ struct KeyEntry | |||
198 | struct IBF_Key ibf_key; | 173 | struct IBF_Key ibf_key; |
199 | 174 | ||
200 | /** | 175 | /** |
201 | * The actual element associated with the key | 176 | * The actual element associated with the key. |
202 | */ | 177 | */ |
203 | struct ElementEntry *element; | 178 | struct ElementEntry *element; |
204 | 179 | ||
205 | /** | 180 | /** |
206 | * Element that collides with this element | 181 | * Element that collides with this element |
207 | * on the ibf key | 182 | * on the ibf key. All colliding entries must have the same ibf key. |
208 | */ | 183 | */ |
209 | struct KeyEntry *next_colliding; | 184 | struct KeyEntry *next_colliding; |
210 | }; | 185 | }; |
@@ -226,7 +201,7 @@ struct SendElementClosure | |||
226 | * Operation for which the elements | 201 | * Operation for which the elements |
227 | * should be sent. | 202 | * should be sent. |
228 | */ | 203 | */ |
229 | struct OperationState *eo; | 204 | struct Operation *op; |
230 | }; | 205 | }; |
231 | 206 | ||
232 | 207 | ||
@@ -242,18 +217,6 @@ struct SetState | |||
242 | * salt=0. | 217 | * salt=0. |
243 | */ | 218 | */ |
244 | struct StrataEstimator *se; | 219 | struct StrataEstimator *se; |
245 | |||
246 | /** | ||
247 | * Evaluate operations are held in | ||
248 | * a linked list. | ||
249 | */ | ||
250 | struct OperationState *ops_head; | ||
251 | |||
252 | /** | ||
253 | * Evaluate operations are held in | ||
254 | * a linked list. | ||
255 | */ | ||
256 | struct OperationState *ops_tail; | ||
257 | }; | 220 | }; |
258 | 221 | ||
259 | 222 | ||
@@ -263,9 +226,9 @@ struct SetState | |||
263 | * @param cls closure | 226 | * @param cls closure |
264 | * @param key current key code | 227 | * @param key current key code |
265 | * @param value value in the hash map | 228 | * @param value value in the hash map |
266 | * @return GNUNET_YES if we should continue to | 229 | * @return #GNUNET_YES if we should continue to |
267 | * iterate, | 230 | * iterate, |
268 | * GNUNET_NO if not. | 231 | * #GNUNET_NO if not. |
269 | */ | 232 | */ |
270 | static int | 233 | static int |
271 | destroy_key_to_element_iter (void *cls, | 234 | destroy_key_to_element_iter (void *cls, |
@@ -290,65 +253,40 @@ destroy_key_to_element_iter (void *cls, | |||
290 | 253 | ||
291 | 254 | ||
292 | /** | 255 | /** |
293 | * Destroy a union operation, and free all resources | 256 | * Destroy the union operation. Only things specific to the union operation are destroyed. |
294 | * associated with it. | 257 | * |
295 | * | 258 | * @param op union operation to destroy |
296 | * @param eo the union operation to destroy | ||
297 | */ | 259 | */ |
298 | static void | 260 | static void |
299 | union_operation_destroy (struct OperationState *eo) | 261 | union_op_cancel (struct Operation *op) |
300 | { | 262 | { |
301 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); | 263 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); |
302 | GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head, | 264 | /* check if the op was canceled twice */ |
303 | eo->set->state->ops_tail, | 265 | GNUNET_assert (NULL != op->state); |
304 | eo); | 266 | if (NULL != op->state->remote_ibf) |
305 | if (NULL != eo->mq) | ||
306 | { | ||
307 | GNUNET_MQ_destroy (eo->mq); | ||
308 | eo->mq = NULL; | ||
309 | } | ||
310 | if (NULL != eo->tunnel) | ||
311 | { | ||
312 | struct GNUNET_MESH_Tunnel *t = eo->tunnel; | ||
313 | eo->tunnel = NULL; | ||
314 | GNUNET_MESH_tunnel_destroy (t); | ||
315 | } | ||
316 | if (NULL != eo->remote_ibf) | ||
317 | { | 267 | { |
318 | ibf_destroy (eo->remote_ibf); | 268 | ibf_destroy (op->state->remote_ibf); |
319 | eo->remote_ibf = NULL; | 269 | op->state->remote_ibf = NULL; |
320 | } | 270 | } |
321 | if (NULL != eo->local_ibf) | 271 | if (NULL != op->state->local_ibf) |
322 | { | 272 | { |
323 | ibf_destroy (eo->local_ibf); | 273 | ibf_destroy (op->state->local_ibf); |
324 | eo->local_ibf = NULL; | 274 | op->state->local_ibf = NULL; |
325 | } | 275 | } |
326 | if (NULL != eo->se) | 276 | if (NULL != op->state->se) |
327 | { | 277 | { |
328 | strata_estimator_destroy (eo->se); | 278 | strata_estimator_destroy (op->state->se); |
329 | eo->se = NULL; | 279 | op->state->se = NULL; |
330 | } | 280 | } |
331 | if (NULL != eo->key_to_element) | 281 | if (NULL != op->state->key_to_element) |
332 | { | 282 | { |
333 | GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL); | 283 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL); |
334 | GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); | 284 | GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); |
335 | eo->key_to_element = NULL; | 285 | op->state->key_to_element = NULL; |
336 | } | 286 | } |
337 | if (NULL != eo->spec) | 287 | GNUNET_free (op->state); |
338 | { | 288 | op->state = NULL; |
339 | if (NULL != eo->spec->context_msg) | ||
340 | { | ||
341 | GNUNET_free (eo->spec->context_msg); | ||
342 | eo->spec->context_msg = NULL; | ||
343 | } | ||
344 | GNUNET_free (eo->spec); | ||
345 | eo->spec = NULL; | ||
346 | } | ||
347 | GNUNET_free (eo); | ||
348 | |||
349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); | 289 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); |
350 | |||
351 | /* FIXME: do a garbage collection of the set generations */ | ||
352 | } | 290 | } |
353 | 291 | ||
354 | 292 | ||
@@ -356,20 +294,22 @@ union_operation_destroy (struct OperationState *eo) | |||
356 | * Inform the client that the union operation has failed, | 294 | * Inform the client that the union operation has failed, |
357 | * and proceed to destroy the evaluate operation. | 295 | * and proceed to destroy the evaluate operation. |
358 | * | 296 | * |
359 | * @param eo the union operation to fail | 297 | * @param op the union operation to fail |
360 | */ | 298 | */ |
361 | static void | 299 | static void |
362 | fail_union_operation (struct OperationState *eo) | 300 | fail_union_operation (struct Operation *op) |
363 | { | 301 | { |
364 | struct GNUNET_MQ_Envelope *ev; | 302 | struct GNUNET_MQ_Envelope *ev; |
365 | struct GNUNET_SET_ResultMessage *msg; | 303 | struct GNUNET_SET_ResultMessage *msg; |
366 | 304 | ||
305 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n"); | ||
306 | |||
367 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 307 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
368 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 308 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
369 | msg->request_id = htonl (eo->spec->client_request_id); | 309 | msg->request_id = htonl (op->spec->client_request_id); |
370 | msg->element_type = htons (0); | 310 | msg->element_type = htons (0); |
371 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 311 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
372 | union_operation_destroy (eo); | 312 | _GSS_operation_destroy (op); |
373 | } | 313 | } |
374 | 314 | ||
375 | 315 | ||
@@ -382,7 +322,7 @@ fail_union_operation (struct OperationState *eo) | |||
382 | * @return the derived IBF key | 322 | * @return the derived IBF key |
383 | */ | 323 | */ |
384 | static struct IBF_Key | 324 | static struct IBF_Key |
385 | get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | 325 | get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt) |
386 | { | 326 | { |
387 | struct IBF_Key key; | 327 | struct IBF_Key key; |
388 | 328 | ||
@@ -398,40 +338,39 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | |||
398 | /** | 338 | /** |
399 | * Send a request for the evaluate operation to a remote peer | 339 | * Send a request for the evaluate operation to a remote peer |
400 | * | 340 | * |
401 | * @param eo operation with the other peer | 341 | * @param op operation with the other peer |
402 | */ | 342 | */ |
403 | static void | 343 | static void |
404 | send_operation_request (struct OperationState *eo) | 344 | send_operation_request (struct Operation *op) |
405 | { | 345 | { |
406 | struct GNUNET_MQ_Envelope *ev; | 346 | struct GNUNET_MQ_Envelope *ev; |
407 | struct OperationRequestMessage *msg; | 347 | struct OperationRequestMessage *msg; |
408 | 348 | ||
409 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 349 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
410 | eo->spec->context_msg); | 350 | op->spec->context_msg); |
411 | 351 | ||
412 | if (NULL == ev) | 352 | if (NULL == ev) |
413 | { | 353 | { |
414 | /* the context message is too large */ | 354 | /* the context message is too large */ |
415 | GNUNET_break (0); | 355 | GNUNET_break (0); |
416 | GNUNET_SERVER_client_disconnect (eo->spec->set->client); | 356 | GNUNET_SERVER_client_disconnect (op->spec->set->client); |
417 | return; | 357 | return; |
418 | } | 358 | } |
419 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); | 359 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); |
420 | msg->app_id = eo->spec->app_id; | 360 | msg->app_id = op->spec->app_id; |
421 | msg->salt = htonl (eo->spec->salt); | 361 | msg->salt = htonl (op->spec->salt); |
422 | GNUNET_MQ_send (eo->mq, ev); | 362 | GNUNET_MQ_send (op->mq, ev); |
423 | 363 | ||
424 | if (NULL != eo->spec->context_msg) | 364 | if (NULL != op->spec->context_msg) |
425 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); | 365 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); |
426 | else | 366 | else |
427 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); | 367 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); |
428 | 368 | ||
429 | if (NULL != eo->spec->context_msg) | 369 | if (NULL != op->spec->context_msg) |
430 | { | 370 | { |
431 | GNUNET_free (eo->spec->context_msg); | 371 | GNUNET_free (op->spec->context_msg); |
432 | eo->spec->context_msg = NULL; | 372 | op->spec->context_msg = NULL; |
433 | } | 373 | } |
434 | |||
435 | } | 374 | } |
436 | 375 | ||
437 | 376 | ||
@@ -442,34 +381,89 @@ send_operation_request (struct OperationState *eo) | |||
442 | * @param cls closure | 381 | * @param cls closure |
443 | * @param key current key code | 382 | * @param key current key code |
444 | * @param value value in the hash map | 383 | * @param value value in the hash map |
445 | * @return GNUNET_YES if we should continue to | 384 | * @return #GNUNET_YES if we should continue to |
446 | * iterate, | 385 | * iterate, |
447 | * GNUNET_NO if not. | 386 | * #GNUNET_NO if not. |
448 | */ | 387 | */ |
449 | static int | 388 | static int |
450 | op_register_element_iterator (void *cls, | 389 | op_register_element_iterator (void *cls, |
451 | uint32_t key, | 390 | uint32_t key, |
452 | void *value) | 391 | void *value) |
453 | { | 392 | { |
454 | struct KeyEntry *const new_k = cls; | 393 | struct KeyEntry *const new_k = cls; |
455 | struct KeyEntry *old_k = value; | 394 | struct KeyEntry *old_k = value; |
456 | 395 | ||
457 | GNUNET_assert (NULL != old_k); | 396 | GNUNET_assert (NULL != old_k); |
458 | do | 397 | /* check if our ibf key collides with the ibf key in the existing entry */ |
398 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) | ||
459 | { | 399 | { |
460 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) | 400 | /* insert the the new key in the collision chain */ |
461 | { | 401 | new_k->next_colliding = old_k->next_colliding; |
462 | new_k->next_colliding = old_k->next_colliding; | 402 | old_k->next_colliding = new_k; |
463 | old_k->next_colliding = new_k; | 403 | /* signal to the caller that we were able to insert into a colliding bucket */ |
404 | return GNUNET_NO; | ||
405 | } | ||
406 | return GNUNET_YES; | ||
407 | } | ||
408 | |||
409 | |||
410 | /** | ||
411 | * Iterator to create the mapping between ibf keys | ||
412 | * and element entries. | ||
413 | * | ||
414 | * @param cls closure | ||
415 | * @param key current key code | ||
416 | * @param value value in the hash map | ||
417 | * @return #GNUNET_YES if we should continue to | ||
418 | * iterate, | ||
419 | * #GNUNET_NO if not. | ||
420 | */ | ||
421 | static int | ||
422 | op_has_element_iterator (void *cls, | ||
423 | uint32_t key, | ||
424 | void *value) | ||
425 | { | ||
426 | struct GNUNET_HashCode *element_hash = cls; | ||
427 | struct KeyEntry *k = value; | ||
428 | |||
429 | GNUNET_assert (NULL != k); | ||
430 | while (NULL != k) | ||
431 | { | ||
432 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash)) | ||
464 | return GNUNET_NO; | 433 | return GNUNET_NO; |
465 | } | 434 | k = k->next_colliding; |
466 | old_k = old_k->next_colliding; | 435 | } |
467 | } while (NULL != old_k); | ||
468 | return GNUNET_YES; | 436 | return GNUNET_YES; |
469 | } | 437 | } |
470 | 438 | ||
471 | 439 | ||
472 | /** | 440 | /** |
441 | * Determine whether the given element is already in the operation's element | ||
442 | * set. | ||
443 | * | ||
444 | * @param op operation that should be tested for 'element_hash' | ||
445 | * @param element_hash hash of the element to look for | ||
446 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise | ||
447 | */ | ||
448 | static int | ||
449 | op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash) | ||
450 | { | ||
451 | int ret; | ||
452 | struct IBF_Key ibf_key; | ||
453 | |||
454 | ibf_key = get_ibf_key (element_hash, op->spec->salt); | ||
455 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | ||
456 | (uint32_t) ibf_key.key_val, | ||
457 | op_has_element_iterator, (void *) element_hash); | ||
458 | |||
459 | /* was the iteration aborted because we found the element? */ | ||
460 | if (GNUNET_SYSERR == ret) | ||
461 | return GNUNET_YES; | ||
462 | return GNUNET_NO; | ||
463 | } | ||
464 | |||
465 | |||
466 | /** | ||
473 | * Insert an element into the union operation's | 467 | * Insert an element into the union operation's |
474 | * key-to-element mapping. Takes ownership of 'ee'. | 468 | * key-to-element mapping. Takes ownership of 'ee'. |
475 | * Note that this does not insert the element in the set, | 469 | * Note that this does not insert the element in the set, |
@@ -477,21 +471,21 @@ op_register_element_iterator (void *cls, | |||
477 | * This is done to speed up re-tried operations, if some elements | 471 | * This is done to speed up re-tried operations, if some elements |
478 | * were transmitted, and then the IBF fails to decode. | 472 | * were transmitted, and then the IBF fails to decode. |
479 | * | 473 | * |
480 | * @param eo the union operation | 474 | * @param op the union operation |
481 | * @param ee the element entry | 475 | * @param ee the element entry |
482 | */ | 476 | */ |
483 | static void | 477 | static void |
484 | op_register_element (struct OperationState *eo, struct ElementEntry *ee) | 478 | op_register_element (struct Operation *op, struct ElementEntry *ee) |
485 | { | 479 | { |
486 | int ret; | 480 | int ret; |
487 | struct IBF_Key ibf_key; | 481 | struct IBF_Key ibf_key; |
488 | struct KeyEntry *k; | 482 | struct KeyEntry *k; |
489 | 483 | ||
490 | ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); | 484 | ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt); |
491 | k = GNUNET_new (struct KeyEntry); | 485 | k = GNUNET_new (struct KeyEntry); |
492 | k->element = ee; | 486 | k->element = ee; |
493 | k->ibf_key = ibf_key; | 487 | k->ibf_key = ibf_key; |
494 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, | 488 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, |
495 | (uint32_t) ibf_key.key_val, | 489 | (uint32_t) ibf_key.key_val, |
496 | op_register_element_iterator, k); | 490 | op_register_element_iterator, k); |
497 | 491 | ||
@@ -499,7 +493,7 @@ op_register_element (struct OperationState *eo, struct ElementEntry *ee) | |||
499 | if (GNUNET_SYSERR == ret) | 493 | if (GNUNET_SYSERR == ret) |
500 | return; | 494 | return; |
501 | 495 | ||
502 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, | 496 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k, |
503 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 497 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
504 | } | 498 | } |
505 | 499 | ||
@@ -542,19 +536,19 @@ init_key_to_element_iterator (void *cls, | |||
542 | const struct GNUNET_HashCode *key, | 536 | const struct GNUNET_HashCode *key, |
543 | void *value) | 537 | void *value) |
544 | { | 538 | { |
545 | struct OperationState *eo = cls; | 539 | struct Operation *op = cls; |
546 | struct ElementEntry *e = value; | 540 | struct ElementEntry *e = value; |
547 | 541 | ||
548 | /* make sure that the element belongs to the set at the time | 542 | /* make sure that the element belongs to the set at the time |
549 | * of creating the operation */ | 543 | * of creating the operation */ |
550 | if ( (e->generation_added > eo->generation_created) || | 544 | if ( (e->generation_added > op->generation_created) || |
551 | ( (GNUNET_YES == e->removed) && | 545 | ( (GNUNET_YES == e->removed) && |
552 | (e->generation_removed < eo->generation_created))) | 546 | (e->generation_removed < op->generation_created))) |
553 | return GNUNET_YES; | 547 | return GNUNET_YES; |
554 | 548 | ||
555 | GNUNET_assert (GNUNET_NO == e->remote); | 549 | GNUNET_assert (GNUNET_NO == e->remote); |
556 | 550 | ||
557 | op_register_element (eo, e); | 551 | op_register_element (op, e); |
558 | return GNUNET_YES; | 552 | return GNUNET_YES; |
559 | } | 553 | } |
560 | 554 | ||
@@ -563,45 +557,45 @@ init_key_to_element_iterator (void *cls, | |||
563 | * Create an ibf with the operation's elements | 557 | * Create an ibf with the operation's elements |
564 | * of the specified size | 558 | * of the specified size |
565 | * | 559 | * |
566 | * @param eo the union operation | 560 | * @param op the union operation |
567 | * @param size size of the ibf to create | 561 | * @param size size of the ibf to create |
568 | */ | 562 | */ |
569 | static void | 563 | static void |
570 | prepare_ibf (struct OperationState *eo, uint16_t size) | 564 | prepare_ibf (struct Operation *op, uint16_t size) |
571 | { | 565 | { |
572 | if (NULL == eo->key_to_element) | 566 | if (NULL == op->state->key_to_element) |
573 | { | 567 | { |
574 | unsigned int len; | 568 | unsigned int len; |
575 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements); | 569 | len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements); |
576 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | 570 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); |
577 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements, | 571 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, |
578 | init_key_to_element_iterator, eo); | 572 | init_key_to_element_iterator, op); |
579 | } | 573 | } |
580 | if (NULL != eo->local_ibf) | 574 | if (NULL != op->state->local_ibf) |
581 | ibf_destroy (eo->local_ibf); | 575 | ibf_destroy (op->state->local_ibf); |
582 | eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 576 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
583 | GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, | 577 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, |
584 | prepare_ibf_iterator, eo->local_ibf); | 578 | prepare_ibf_iterator, op->state->local_ibf); |
585 | } | 579 | } |
586 | 580 | ||
587 | 581 | ||
588 | /** | 582 | /** |
589 | * Send an ibf of appropriate size. | 583 | * Send an ibf of appropriate size. |
590 | * | 584 | * |
591 | * @param eo the union operation | 585 | * @param op the union operation |
592 | * @param ibf_order order of the ibf to send, size=2^order | 586 | * @param ibf_order order of the ibf to send, size=2^order |
593 | */ | 587 | */ |
594 | static void | 588 | static void |
595 | send_ibf (struct OperationState *eo, uint16_t ibf_order) | 589 | send_ibf (struct Operation *op, uint16_t ibf_order) |
596 | { | 590 | { |
597 | unsigned int buckets_sent = 0; | 591 | unsigned int buckets_sent = 0; |
598 | struct InvertibleBloomFilter *ibf; | 592 | struct InvertibleBloomFilter *ibf; |
599 | 593 | ||
600 | prepare_ibf (eo, 1<<ibf_order); | 594 | prepare_ibf (op, 1<<ibf_order); |
601 | 595 | ||
602 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); | 596 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); |
603 | 597 | ||
604 | ibf = eo->local_ibf; | 598 | ibf = op->state->local_ibf; |
605 | 599 | ||
606 | while (buckets_sent < (1 << ibf_order)) | 600 | while (buckets_sent < (1 << ibf_order)) |
607 | { | 601 | { |
@@ -624,20 +618,20 @@ send_ibf (struct OperationState *eo, uint16_t ibf_order) | |||
624 | buckets_sent += buckets_in_message; | 618 | buckets_sent += buckets_in_message; |
625 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", | 619 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", |
626 | buckets_in_message, buckets_sent, 1<<ibf_order); | 620 | buckets_in_message, buckets_sent, 1<<ibf_order); |
627 | GNUNET_MQ_send (eo->mq, ev); | 621 | GNUNET_MQ_send (op->mq, ev); |
628 | } | 622 | } |
629 | 623 | ||
630 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; | 624 | op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; |
631 | } | 625 | } |
632 | 626 | ||
633 | 627 | ||
634 | /** | 628 | /** |
635 | * Send a strata estimator to the remote peer. | 629 | * Send a strata estimator to the remote peer. |
636 | * | 630 | * |
637 | * @param eo the union operation with the remote peer | 631 | * @param op the union operation with the remote peer |
638 | */ | 632 | */ |
639 | static void | 633 | static void |
640 | send_strata_estimator (struct OperationState *eo) | 634 | send_strata_estimator (struct Operation *op) |
641 | { | 635 | { |
642 | struct GNUNET_MQ_Envelope *ev; | 636 | struct GNUNET_MQ_Envelope *ev; |
643 | struct GNUNET_MessageHeader *strata_msg; | 637 | struct GNUNET_MessageHeader *strata_msg; |
@@ -645,9 +639,9 @@ send_strata_estimator (struct OperationState *eo) | |||
645 | ev = GNUNET_MQ_msg_header_extra (strata_msg, | 639 | ev = GNUNET_MQ_msg_header_extra (strata_msg, |
646 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | 640 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, |
647 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | 641 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); |
648 | strata_estimator_write (eo->set->state->se, &strata_msg[1]); | 642 | strata_estimator_write (op->state->se, &strata_msg[1]); |
649 | GNUNET_MQ_send (eo->mq, ev); | 643 | GNUNET_MQ_send (op->mq, ev); |
650 | eo->phase = PHASE_EXPECT_IBF; | 644 | op->state->phase = PHASE_EXPECT_IBF; |
651 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); | 645 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); |
652 | } | 646 | } |
653 | 647 | ||
@@ -682,27 +676,27 @@ get_order_from_difference (unsigned int diff) | |||
682 | static void | 676 | static void |
683 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | 677 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) |
684 | { | 678 | { |
685 | struct OperationState *eo = cls; | 679 | struct Operation *op = cls; |
686 | struct StrataEstimator *remote_se; | 680 | struct StrataEstimator *remote_se; |
687 | int diff; | 681 | int diff; |
688 | 682 | ||
689 | if (eo->phase != PHASE_EXPECT_SE) | 683 | if (op->state->phase != PHASE_EXPECT_SE) |
690 | { | 684 | { |
691 | fail_union_operation (eo); | 685 | fail_union_operation (op); |
692 | GNUNET_break (0); | 686 | GNUNET_break (0); |
693 | return; | 687 | return; |
694 | } | 688 | } |
695 | remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, | 689 | remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, |
696 | SE_IBF_HASH_NUM); | 690 | SE_IBF_HASH_NUM); |
697 | strata_estimator_read (&mh[1], remote_se); | 691 | strata_estimator_read (&mh[1], remote_se); |
698 | GNUNET_assert (NULL != eo->se); | 692 | GNUNET_assert (NULL != op->state->se); |
699 | diff = strata_estimator_difference (remote_se, eo->se); | 693 | diff = strata_estimator_difference (remote_se, op->state->se); |
700 | strata_estimator_destroy (remote_se); | 694 | strata_estimator_destroy (remote_se); |
701 | strata_estimator_destroy (eo->se); | 695 | strata_estimator_destroy (op->state->se); |
702 | eo->se = NULL; | 696 | op->state->se = NULL; |
703 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", | 697 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", |
704 | diff, 1<<get_order_from_difference (diff)); | 698 | diff, 1<<get_order_from_difference (diff)); |
705 | send_ibf (eo, get_order_from_difference (diff)); | 699 | send_ibf (op, get_order_from_difference (diff)); |
706 | } | 700 | } |
707 | 701 | ||
708 | 702 | ||
@@ -721,7 +715,7 @@ send_element_iterator (void *cls, | |||
721 | { | 715 | { |
722 | struct SendElementClosure *sec = cls; | 716 | struct SendElementClosure *sec = cls; |
723 | struct IBF_Key ibf_key = sec->ibf_key; | 717 | struct IBF_Key ibf_key = sec->ibf_key; |
724 | struct OperationState *eo = sec->eo; | 718 | struct Operation *op = sec->op; |
725 | struct KeyEntry *ke = value; | 719 | struct KeyEntry *ke = value; |
726 | 720 | ||
727 | if (ke->ibf_key.key_val != ibf_key.key_val) | 721 | if (ke->ibf_key.key_val != ibf_key.key_val) |
@@ -743,7 +737,7 @@ send_element_iterator (void *cls, | |||
743 | memcpy (&mh[1], element->data, element->size); | 737 | memcpy (&mh[1], element->data, element->size); |
744 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", | 738 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", |
745 | GNUNET_h2s (&ke->element->element_hash)); | 739 | GNUNET_h2s (&ke->element->element_hash)); |
746 | GNUNET_MQ_send (eo->mq, ev); | 740 | GNUNET_MQ_send (op->mq, ev); |
747 | ke = ke->next_colliding; | 741 | ke = ke->next_colliding; |
748 | } | 742 | } |
749 | return GNUNET_NO; | 743 | return GNUNET_NO; |
@@ -753,17 +747,17 @@ send_element_iterator (void *cls, | |||
753 | * Send all elements that have the specified IBF key | 747 | * Send all elements that have the specified IBF key |
754 | * to the remote peer of the union operation | 748 | * to the remote peer of the union operation |
755 | * | 749 | * |
756 | * @param eo union operation | 750 | * @param op union operation |
757 | * @param ibf_key IBF key of interest | 751 | * @param ibf_key IBF key of interest |
758 | */ | 752 | */ |
759 | static void | 753 | static void |
760 | send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) | 754 | send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key) |
761 | { | 755 | { |
762 | struct SendElementClosure send_cls; | 756 | struct SendElementClosure send_cls; |
763 | 757 | ||
764 | send_cls.ibf_key = ibf_key; | 758 | send_cls.ibf_key = ibf_key; |
765 | send_cls.eo = eo; | 759 | send_cls.op = op; |
766 | GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, | 760 | GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, |
767 | &send_element_iterator, &send_cls); | 761 | &send_element_iterator, &send_cls); |
768 | } | 762 | } |
769 | 763 | ||
@@ -772,10 +766,10 @@ send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) | |||
772 | * Decode which elements are missing on each side, and | 766 | * Decode which elements are missing on each side, and |
773 | * send the appropriate elemens and requests | 767 | * send the appropriate elemens and requests |
774 | * | 768 | * |
775 | * @param eo union operation | 769 | * @param op union operation |
776 | */ | 770 | */ |
777 | static void | 771 | static void |
778 | decode_and_send (struct OperationState *eo) | 772 | decode_and_send (struct Operation *op) |
779 | { | 773 | { |
780 | struct IBF_Key key; | 774 | struct IBF_Key key; |
781 | struct IBF_Key last_key; | 775 | struct IBF_Key last_key; |
@@ -783,14 +777,14 @@ decode_and_send (struct OperationState *eo) | |||
783 | unsigned int num_decoded; | 777 | unsigned int num_decoded; |
784 | struct InvertibleBloomFilter *diff_ibf; | 778 | struct InvertibleBloomFilter *diff_ibf; |
785 | 779 | ||
786 | GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); | 780 | GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase); |
787 | 781 | ||
788 | prepare_ibf (eo, eo->remote_ibf->size); | 782 | prepare_ibf (op, op->state->remote_ibf->size); |
789 | diff_ibf = ibf_dup (eo->local_ibf); | 783 | diff_ibf = ibf_dup (op->state->local_ibf); |
790 | ibf_subtract (diff_ibf, eo->remote_ibf); | 784 | ibf_subtract (diff_ibf, op->state->remote_ibf); |
791 | 785 | ||
792 | ibf_destroy (eo->remote_ibf); | 786 | ibf_destroy (op->state->remote_ibf); |
793 | eo->remote_ibf = NULL; | 787 | op->state->remote_ibf = NULL; |
794 | 788 | ||
795 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); | 789 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); |
796 | 790 | ||
@@ -829,7 +823,7 @@ decode_and_send (struct OperationState *eo) | |||
829 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 823 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
830 | "decoding failed, sending larger ibf (size %u)\n", | 824 | "decoding failed, sending larger ibf (size %u)\n", |
831 | 1<<next_order); | 825 | 1<<next_order); |
832 | send_ibf (eo, next_order); | 826 | send_ibf (op, next_order); |
833 | } | 827 | } |
834 | else | 828 | else |
835 | { | 829 | { |
@@ -844,28 +838,26 @@ decode_and_send (struct OperationState *eo) | |||
844 | 838 | ||
845 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); | 839 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); |
846 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 840 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
847 | GNUNET_MQ_send (eo->mq, ev); | 841 | GNUNET_MQ_send (op->mq, ev); |
848 | break; | 842 | break; |
849 | } | 843 | } |
850 | if (1 == side) | 844 | if (1 == side) |
851 | { | 845 | { |
852 | send_elements_for_key (eo, key); | 846 | send_elements_for_key (op, key); |
853 | } | 847 | } |
854 | else if (-1 == side) | 848 | else if (-1 == side) |
855 | { | 849 | { |
856 | struct GNUNET_MQ_Envelope *ev; | 850 | struct GNUNET_MQ_Envelope *ev; |
857 | struct GNUNET_MessageHeader *msg; | 851 | struct GNUNET_MessageHeader *msg; |
858 | 852 | ||
859 | /* FIXME: before sending the request, check if we may just have the element */ | 853 | /* It may be nice to merge multiple requests, but with mesh's corking it is not worth |
860 | /* FIXME: merge multiple requests */ | 854 | * the effort additional complexity. */ |
861 | /* FIXME: remember somewhere that we already requested the element, | ||
862 | * so that we don't request it again with the next ibf if decoding fails */ | ||
863 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), | 855 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), |
864 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); | 856 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); |
865 | 857 | ||
866 | *(struct IBF_Key *) &msg[1] = key; | 858 | *(struct IBF_Key *) &msg[1] = key; |
867 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); | 859 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); |
868 | GNUNET_MQ_send (eo->mq, ev); | 860 | GNUNET_MQ_send (op->mq, ev); |
869 | } | 861 | } |
870 | else | 862 | else |
871 | { | 863 | { |
@@ -885,32 +877,32 @@ decode_and_send (struct OperationState *eo) | |||
885 | static void | 877 | static void |
886 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | 878 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) |
887 | { | 879 | { |
888 | struct OperationState *eo = cls; | 880 | struct Operation *op = cls; |
889 | struct IBFMessage *msg = (struct IBFMessage *) mh; | 881 | struct IBFMessage *msg = (struct IBFMessage *) mh; |
890 | unsigned int buckets_in_message; | 882 | unsigned int buckets_in_message; |
891 | 883 | ||
892 | if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || | 884 | if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || |
893 | (eo->phase == PHASE_EXPECT_IBF) ) | 885 | (op->state->phase == PHASE_EXPECT_IBF) ) |
894 | { | 886 | { |
895 | eo->phase = PHASE_EXPECT_IBF_CONT; | 887 | op->state->phase = PHASE_EXPECT_IBF_CONT; |
896 | GNUNET_assert (NULL == eo->remote_ibf); | 888 | GNUNET_assert (NULL == op->state->remote_ibf); |
897 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); | 889 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); |
898 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | 890 | op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); |
899 | eo->ibf_buckets_received = 0; | 891 | op->state->ibf_buckets_received = 0; |
900 | if (0 != ntohs (msg->offset)) | 892 | if (0 != ntohs (msg->offset)) |
901 | { | 893 | { |
902 | GNUNET_break (0); | 894 | GNUNET_break (0); |
903 | fail_union_operation (eo); | 895 | fail_union_operation (op); |
904 | return; | 896 | return; |
905 | } | 897 | } |
906 | } | 898 | } |
907 | else if (eo->phase == PHASE_EXPECT_IBF_CONT) | 899 | else if (op->state->phase == PHASE_EXPECT_IBF_CONT) |
908 | { | 900 | { |
909 | if ( (ntohs (msg->offset) != eo->ibf_buckets_received) || | 901 | if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) || |
910 | (1<<msg->order != eo->remote_ibf->size) ) | 902 | (1<<msg->order != op->state->remote_ibf->size) ) |
911 | { | 903 | { |
912 | GNUNET_break (0); | 904 | GNUNET_break (0); |
913 | fail_union_operation (eo); | 905 | fail_union_operation (op); |
914 | return; | 906 | return; |
915 | } | 907 | } |
916 | } | 908 | } |
@@ -920,25 +912,25 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
920 | if (0 == buckets_in_message) | 912 | if (0 == buckets_in_message) |
921 | { | 913 | { |
922 | GNUNET_break_op (0); | 914 | GNUNET_break_op (0); |
923 | fail_union_operation (eo); | 915 | fail_union_operation (op); |
924 | return; | 916 | return; |
925 | } | 917 | } |
926 | 918 | ||
927 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) | 919 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) |
928 | { | 920 | { |
929 | GNUNET_break (0); | 921 | GNUNET_break (0); |
930 | fail_union_operation (eo); | 922 | fail_union_operation (op); |
931 | return; | 923 | return; |
932 | } | 924 | } |
933 | 925 | ||
934 | ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); | 926 | ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf); |
935 | eo->ibf_buckets_received += buckets_in_message; | 927 | op->state->ibf_buckets_received += buckets_in_message; |
936 | 928 | ||
937 | if (eo->ibf_buckets_received == eo->remote_ibf->size) | 929 | if (op->state->ibf_buckets_received == op->state->remote_ibf->size) |
938 | { | 930 | { |
939 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); | 931 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); |
940 | eo->phase = PHASE_EXPECT_ELEMENTS; | 932 | op->state->phase = PHASE_EXPECT_ELEMENTS; |
941 | decode_and_send (eo); | 933 | decode_and_send (op); |
942 | } | 934 | } |
943 | } | 935 | } |
944 | 936 | ||
@@ -947,18 +939,18 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
947 | * Send a result message to the client indicating | 939 | * Send a result message to the client indicating |
948 | * that there is a new element. | 940 | * that there is a new element. |
949 | * | 941 | * |
950 | * @param eo union operation | 942 | * @param op union operation |
951 | * @param element element to send | 943 | * @param element element to send |
952 | */ | 944 | */ |
953 | static void | 945 | static void |
954 | send_client_element (struct OperationState *eo, | 946 | send_client_element (struct Operation *op, |
955 | struct GNUNET_SET_Element *element) | 947 | struct GNUNET_SET_Element *element) |
956 | { | 948 | { |
957 | struct GNUNET_MQ_Envelope *ev; | 949 | struct GNUNET_MQ_Envelope *ev; |
958 | struct GNUNET_SET_ResultMessage *rm; | 950 | struct GNUNET_SET_ResultMessage *rm; |
959 | 951 | ||
960 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); | 952 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); |
961 | GNUNET_assert (0 != eo->spec->client_request_id); | 953 | GNUNET_assert (0 != op->spec->client_request_id); |
962 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | 954 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
963 | if (NULL == ev) | 955 | if (NULL == ev) |
964 | { | 956 | { |
@@ -967,38 +959,112 @@ send_client_element (struct OperationState *eo, | |||
967 | return; | 959 | return; |
968 | } | 960 | } |
969 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 961 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
970 | rm->request_id = htonl (eo->spec->client_request_id); | 962 | rm->request_id = htonl (op->spec->client_request_id); |
971 | rm->element_type = element->type; | 963 | rm->element_type = element->type; |
972 | memcpy (&rm[1], element->data, element->size); | 964 | memcpy (&rm[1], element->data, element->size); |
973 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 965 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
974 | } | 966 | } |
975 | 967 | ||
976 | 968 | ||
977 | /** | 969 | /** |
978 | * Send a result message to the client indicating | 970 | * Signal to the client that the operation has finished and |
979 | * that the operation is over. | 971 | * destroy the operation. |
980 | * After the result done message has been sent to the client, | ||
981 | * destroy the evaluate operation. | ||
982 | * | 972 | * |
983 | * @param eo union operation | 973 | * @param cls operation to destroy |
984 | */ | 974 | */ |
985 | static void | 975 | static void |
986 | send_client_done_and_destroy (struct OperationState *eo) | 976 | send_done_and_destroy (void *cls) |
987 | { | 977 | { |
978 | struct Operation *op = cls; | ||
988 | struct GNUNET_MQ_Envelope *ev; | 979 | struct GNUNET_MQ_Envelope *ev; |
989 | struct GNUNET_SET_ResultMessage *rm; | 980 | struct GNUNET_SET_ResultMessage *rm; |
990 | |||
991 | GNUNET_assert (GNUNET_NO == eo->client_done_sent); | ||
992 | |||
993 | eo->client_done_sent = GNUNET_YES; | ||
994 | |||
995 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 981 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
996 | rm->request_id = htonl (eo->spec->client_request_id); | 982 | rm->request_id = htonl (op->spec->client_request_id); |
997 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 983 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
998 | rm->element_type = htons (0); | 984 | rm->element_type = htons (0); |
999 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | 985 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
986 | _GSS_operation_destroy (op); | ||
987 | } | ||
988 | |||
989 | |||
990 | /** | ||
991 | * Send all remaining elements in the full result iterator. | ||
992 | * | ||
993 | * @param cls operation | ||
994 | */ | ||
995 | static void | ||
996 | send_remaining_elements (void *cls) | ||
997 | { | ||
998 | struct Operation *op = cls; | ||
999 | struct KeyEntry *ke; | ||
1000 | int res; | ||
1001 | |||
1002 | res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke); | ||
1003 | res = GNUNET_NO; | ||
1004 | if (GNUNET_NO == res) | ||
1005 | { | ||
1006 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n"); | ||
1007 | send_done_and_destroy (op); | ||
1008 | return; | ||
1009 | } | ||
1010 | |||
1011 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n"); | ||
1012 | |||
1013 | while (1) | ||
1014 | { | ||
1015 | struct GNUNET_MQ_Envelope *ev; | ||
1016 | struct GNUNET_SET_ResultMessage *rm; | ||
1017 | struct GNUNET_SET_Element *element; | ||
1018 | element = &ke->element->element; | ||
1019 | |||
1020 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size); | ||
1021 | GNUNET_assert (0 != op->spec->client_request_id); | ||
1022 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
1023 | if (NULL == ev) | ||
1024 | { | ||
1025 | GNUNET_MQ_discard (ev); | ||
1026 | GNUNET_break (0); | ||
1027 | continue; | ||
1028 | } | ||
1029 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | ||
1030 | rm->request_id = htonl (op->spec->client_request_id); | ||
1031 | rm->element_type = element->type; | ||
1032 | memcpy (&rm[1], element->data, element->size); | ||
1033 | if (ke->next_colliding == NULL) | ||
1034 | { | ||
1035 | GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); | ||
1036 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
1037 | break; | ||
1038 | } | ||
1039 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
1040 | ke = ke->next_colliding; | ||
1041 | } | ||
1042 | } | ||
1000 | 1043 | ||
1001 | union_operation_destroy (eo); | 1044 | |
1045 | /** | ||
1046 | * Send a result message to the client indicating | ||
1047 | * that the operation is over. | ||
1048 | * After the result done message has been sent to the client, | ||
1049 | * destroy the evaluate operation. | ||
1050 | * | ||
1051 | * @param op union operation | ||
1052 | */ | ||
1053 | static void | ||
1054 | finish_and_destroy (struct Operation *op) | ||
1055 | { | ||
1056 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | ||
1057 | |||
1058 | if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) | ||
1059 | { | ||
1060 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); | ||
1061 | GNUNET_assert (NULL == op->state->full_result_iter); | ||
1062 | op->state->full_result_iter = | ||
1063 | GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element); | ||
1064 | send_remaining_elements (op); | ||
1065 | return; | ||
1066 | } | ||
1067 | send_done_and_destroy (op); | ||
1002 | } | 1068 | } |
1003 | 1069 | ||
1004 | 1070 | ||
@@ -1011,16 +1077,16 @@ send_client_done_and_destroy (struct OperationState *eo) | |||
1011 | static void | 1077 | static void |
1012 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | 1078 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) |
1013 | { | 1079 | { |
1014 | struct OperationState *eo = cls; | 1080 | struct Operation *op = cls; |
1015 | struct ElementEntry *ee; | 1081 | struct ElementEntry *ee; |
1016 | uint16_t element_size; | 1082 | uint16_t element_size; |
1017 | 1083 | ||
1018 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); | 1084 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); |
1019 | 1085 | ||
1020 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && | 1086 | if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) && |
1021 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) | 1087 | (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) |
1022 | { | 1088 | { |
1023 | fail_union_operation (eo); | 1089 | fail_union_operation (op); |
1024 | GNUNET_break (0); | 1090 | GNUNET_break (0); |
1025 | return; | 1091 | return; |
1026 | } | 1092 | } |
@@ -1032,12 +1098,17 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1032 | ee->remote = GNUNET_YES; | 1098 | ee->remote = GNUNET_YES; |
1033 | GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); | 1099 | GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); |
1034 | 1100 | ||
1035 | /* FIXME: see if the element has already been inserted! */ | 1101 | if (GNUNET_YES == op_has_element (op, &ee->element_hash)) |
1102 | { | ||
1103 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n"); | ||
1104 | GNUNET_free (ee); | ||
1105 | return; | ||
1106 | } | ||
1036 | 1107 | ||
1037 | op_register_element (eo, ee); | 1108 | op_register_element (op, ee); |
1038 | /* only send results immediately if the client wants it */ | 1109 | /* only send results immediately if the client wants it */ |
1039 | if (GNUNET_SET_RESULT_ADDED == eo->spec->result_mode) | 1110 | if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode) |
1040 | send_client_element (eo, &ee->element); | 1111 | send_client_element (op, &ee->element); |
1041 | } | 1112 | } |
1042 | 1113 | ||
1043 | 1114 | ||
@@ -1050,15 +1121,15 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1050 | static void | 1121 | static void |
1051 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | 1122 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) |
1052 | { | 1123 | { |
1053 | struct OperationState *eo = cls; | 1124 | struct Operation *op = cls; |
1054 | struct IBF_Key *ibf_key; | 1125 | struct IBF_Key *ibf_key; |
1055 | unsigned int num_keys; | 1126 | unsigned int num_keys; |
1056 | 1127 | ||
1057 | /* look up elements and send them */ | 1128 | /* look up elements and send them */ |
1058 | if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1129 | if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
1059 | { | 1130 | { |
1060 | GNUNET_break (0); | 1131 | GNUNET_break (0); |
1061 | fail_union_operation (eo); | 1132 | fail_union_operation (op); |
1062 | return; | 1133 | return; |
1063 | } | 1134 | } |
1064 | 1135 | ||
@@ -1067,14 +1138,14 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1067 | if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) | 1138 | if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) |
1068 | { | 1139 | { |
1069 | GNUNET_break (0); | 1140 | GNUNET_break (0); |
1070 | fail_union_operation (eo); | 1141 | fail_union_operation (op); |
1071 | return; | 1142 | return; |
1072 | } | 1143 | } |
1073 | 1144 | ||
1074 | ibf_key = (struct IBF_Key *) &mh[1]; | 1145 | ibf_key = (struct IBF_Key *) &mh[1]; |
1075 | while (0 != num_keys--) | 1146 | while (0 != num_keys--) |
1076 | { | 1147 | { |
1077 | send_elements_for_key (eo, *ibf_key); | 1148 | send_elements_for_key (op, *ibf_key); |
1078 | ibf_key++; | 1149 | ibf_key++; |
1079 | } | 1150 | } |
1080 | } | 1151 | } |
@@ -1089,28 +1160,28 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1089 | static void | 1160 | static void |
1090 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | 1161 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) |
1091 | { | 1162 | { |
1092 | struct OperationState *eo = cls; | 1163 | struct Operation *op = cls; |
1093 | struct GNUNET_MQ_Envelope *ev; | 1164 | struct GNUNET_MQ_Envelope *ev; |
1094 | 1165 | ||
1095 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1166 | if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
1096 | { | 1167 | { |
1097 | /* we got all requests, but still have to send our elements as response */ | 1168 | /* we got all requests, but still have to send our elements as response */ |
1098 | 1169 | ||
1099 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); | 1170 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); |
1100 | eo->phase = PHASE_FINISHED; | 1171 | op->state->phase = PHASE_FINISHED; |
1101 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 1172 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
1102 | GNUNET_MQ_send (eo->mq, ev); | 1173 | GNUNET_MQ_send (op->mq, ev); |
1103 | return; | 1174 | return; |
1104 | } | 1175 | } |
1105 | if (eo->phase == PHASE_EXPECT_ELEMENTS) | 1176 | if (op->state->phase == PHASE_EXPECT_ELEMENTS) |
1106 | { | 1177 | { |
1107 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); | 1178 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); |
1108 | eo->phase = PHASE_FINISHED; | 1179 | op->state->phase = PHASE_FINISHED; |
1109 | send_client_done_and_destroy (eo); | 1180 | finish_and_destroy (op); |
1110 | return; | 1181 | return; |
1111 | } | 1182 | } |
1112 | GNUNET_break (0); | 1183 | GNUNET_break (0); |
1113 | fail_union_operation (eo); | 1184 | fail_union_operation (op); |
1114 | } | 1185 | } |
1115 | 1186 | ||
1116 | 1187 | ||
@@ -1118,78 +1189,34 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1118 | * Evaluate a union operation with | 1189 | * Evaluate a union operation with |
1119 | * a remote peer. | 1190 | * a remote peer. |
1120 | * | 1191 | * |
1121 | * @param spec specification of the operation the evaluate | 1192 | * @param op operation to evaluate |
1122 | * @param tunnel tunnel already connected to the partner peer | ||
1123 | * @param tc tunnel context, passed here so all new incoming | ||
1124 | * messages are directly going to the union operations | ||
1125 | * @return a handle to the operation | ||
1126 | */ | 1193 | */ |
1127 | static void | 1194 | static void |
1128 | union_evaluate (struct OperationSpecification *spec, | 1195 | union_evaluate (struct Operation *op) |
1129 | struct GNUNET_MESH_Tunnel *tunnel, | ||
1130 | struct TunnelContext *tc) | ||
1131 | { | 1196 | { |
1132 | struct OperationState *eo; | 1197 | op->state = GNUNET_new (struct OperationState); |
1133 | 1198 | op->state->se = strata_estimator_dup (op->spec->set->state->se); | |
1134 | eo = GNUNET_new (struct OperationState); | ||
1135 | tc->vt = _GSS_union_vt (); | ||
1136 | tc->op = eo; | ||
1137 | eo->se = strata_estimator_dup (spec->set->state->se); | ||
1138 | eo->generation_created = spec->set->current_generation++; | ||
1139 | eo->set = spec->set; | ||
1140 | eo->spec = spec; | ||
1141 | eo->tunnel = tunnel; | ||
1142 | eo->tunnel = tunnel; | ||
1143 | eo->mq = GNUNET_MESH_mq_create (tunnel); | ||
1144 | |||
1145 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1146 | "evaluating union operation, (app %s)\n", | ||
1147 | GNUNET_h2s (&eo->spec->app_id)); | ||
1148 | |||
1149 | /* we started the operation, thus we have to send the operation request */ | 1199 | /* we started the operation, thus we have to send the operation request */ |
1150 | eo->phase = PHASE_EXPECT_SE; | 1200 | op->state->phase = PHASE_EXPECT_SE; |
1151 | 1201 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation"); | |
1152 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, | 1202 | send_operation_request (op); |
1153 | eo->set->state->ops_tail, | ||
1154 | eo); | ||
1155 | |||
1156 | send_operation_request (eo); | ||
1157 | } | 1203 | } |
1158 | 1204 | ||
1159 | 1205 | ||
1160 | /** | 1206 | /** |
1161 | * Accept an union operation request from a remote peer | 1207 | * Accept an union operation request from a remote peer. |
1208 | * Only initializes the private operation state. | ||
1162 | * | 1209 | * |
1163 | * @param spec all necessary information about the operation | 1210 | * @param op operation that will be accepted as a union operation |
1164 | * @param tunnel open tunnel to the partner's peer | ||
1165 | * @param tc tunnel context, passed here so all new incoming | ||
1166 | * messages are directly going to the union operations | ||
1167 | * @return operation | ||
1168 | */ | 1211 | */ |
1169 | static void | 1212 | static void |
1170 | union_accept (struct OperationSpecification *spec, | 1213 | union_accept (struct Operation *op) |
1171 | struct GNUNET_MESH_Tunnel *tunnel, | ||
1172 | struct TunnelContext *tc) | ||
1173 | { | 1214 | { |
1174 | struct OperationState *eo; | ||
1175 | |||
1176 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); | 1215 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); |
1177 | 1216 | op->state = GNUNET_new (struct OperationState); | |
1178 | eo = GNUNET_new (struct OperationState); | 1217 | op->state->se = strata_estimator_dup (op->spec->set->state->se); |
1179 | tc->vt = _GSS_union_vt (); | ||
1180 | tc->op = eo; | ||
1181 | eo->set = spec->set; | ||
1182 | eo->generation_created = eo->set->current_generation++; | ||
1183 | eo->spec = spec; | ||
1184 | eo->tunnel = tunnel; | ||
1185 | eo->mq = GNUNET_MESH_mq_create (tunnel); | ||
1186 | eo->se = strata_estimator_dup (eo->set->state->se); | ||
1187 | /* transfer ownership of mq and socket from incoming to eo */ | ||
1188 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, | ||
1189 | eo->set->state->ops_tail, | ||
1190 | eo); | ||
1191 | /* kick off the operation */ | 1218 | /* kick off the operation */ |
1192 | send_strata_estimator (eo); | 1219 | send_strata_estimator (op); |
1193 | } | 1220 | } |
1194 | 1221 | ||
1195 | 1222 | ||
@@ -1240,17 +1267,13 @@ union_remove (struct SetState *set_state, struct ElementEntry *ee) | |||
1240 | 1267 | ||
1241 | 1268 | ||
1242 | /** | 1269 | /** |
1243 | * Destroy a set that supports the union operation | 1270 | * Destroy a set that supports the union operation. |
1244 | * | 1271 | * |
1245 | * @param set_state the set to destroy | 1272 | * @param set_state the set to destroy |
1246 | */ | 1273 | */ |
1247 | static void | 1274 | static void |
1248 | union_set_destroy (struct SetState *set_state) | 1275 | union_set_destroy (struct SetState *set_state) |
1249 | { | 1276 | { |
1250 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n"); | ||
1251 | /* important to destroy operations before the rest of the set */ | ||
1252 | while (NULL != set_state->ops_head) | ||
1253 | union_operation_destroy (set_state->ops_head); | ||
1254 | if (NULL != set_state->se) | 1277 | if (NULL != set_state->se) |
1255 | { | 1278 | { |
1256 | strata_estimator_destroy (set_state->se); | 1279 | strata_estimator_destroy (set_state->se); |
@@ -1263,13 +1286,13 @@ union_set_destroy (struct SetState *set_state) | |||
1263 | /** | 1286 | /** |
1264 | * Dispatch messages for a union operation. | 1287 | * Dispatch messages for a union operation. |
1265 | * | 1288 | * |
1266 | * @param eo the state of the union evaluate operation | 1289 | * @param op the state of the union evaluate operation |
1267 | * @param mh the received message | 1290 | * @param mh the received message |
1268 | * @return GNUNET_SYSERR if the tunnel should be disconnected, | 1291 | * @return GNUNET_SYSERR if the tunnel should be disconnected, |
1269 | * GNUNET_OK otherwise | 1292 | * GNUNET_OK otherwise |
1270 | */ | 1293 | */ |
1271 | int | 1294 | int |
1272 | union_handle_p2p_message (struct OperationState *eo, | 1295 | union_handle_p2p_message (struct Operation *op, |
1273 | const struct GNUNET_MessageHeader *mh) | 1296 | const struct GNUNET_MessageHeader *mh) |
1274 | { | 1297 | { |
1275 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", | 1298 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", |
@@ -1277,19 +1300,19 @@ union_handle_p2p_message (struct OperationState *eo, | |||
1277 | switch (ntohs (mh->type)) | 1300 | switch (ntohs (mh->type)) |
1278 | { | 1301 | { |
1279 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: | 1302 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: |
1280 | handle_p2p_ibf (eo, mh); | 1303 | handle_p2p_ibf (op, mh); |
1281 | break; | 1304 | break; |
1282 | case GNUNET_MESSAGE_TYPE_SET_P2P_SE: | 1305 | case GNUNET_MESSAGE_TYPE_SET_P2P_SE: |
1283 | handle_p2p_strata_estimator (eo, mh); | 1306 | handle_p2p_strata_estimator (op, mh); |
1284 | break; | 1307 | break; |
1285 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | 1308 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: |
1286 | handle_p2p_elements (eo, mh); | 1309 | handle_p2p_elements (op, mh); |
1287 | break; | 1310 | break; |
1288 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: | 1311 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: |
1289 | handle_p2p_element_requests (eo, mh); | 1312 | handle_p2p_element_requests (op, mh); |
1290 | break; | 1313 | break; |
1291 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: | 1314 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: |
1292 | handle_p2p_done (eo, mh); | 1315 | handle_p2p_done (op, mh); |
1293 | break; | 1316 | break; |
1294 | default: | 1317 | default: |
1295 | /* something wrong with mesh's message handlers? */ | 1318 | /* something wrong with mesh's message handlers? */ |
@@ -1300,18 +1323,9 @@ union_handle_p2p_message (struct OperationState *eo, | |||
1300 | 1323 | ||
1301 | 1324 | ||
1302 | static void | 1325 | static void |
1303 | union_peer_disconnect (struct OperationState *op) | 1326 | union_peer_disconnect (struct Operation *op) |
1304 | { | 1327 | { |
1305 | /* Are we already disconnected? */ | 1328 | if (PHASE_FINISHED != op->state->phase) |
1306 | if (NULL == op->tunnel) | ||
1307 | return; | ||
1308 | op->tunnel = NULL; | ||
1309 | if (NULL != op->mq) | ||
1310 | { | ||
1311 | GNUNET_MQ_destroy (op->mq); | ||
1312 | op->mq = NULL; | ||
1313 | } | ||
1314 | if (PHASE_FINISHED != op->phase) | ||
1315 | { | 1329 | { |
1316 | struct GNUNET_MQ_Envelope *ev; | 1330 | struct GNUNET_MQ_Envelope *ev; |
1317 | struct GNUNET_SET_ResultMessage *msg; | 1331 | struct GNUNET_SET_ResultMessage *msg; |
@@ -1322,34 +1336,12 @@ union_peer_disconnect (struct OperationState *op) | |||
1322 | msg->element_type = htons (0); | 1336 | msg->element_type = htons (0); |
1323 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | 1337 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
1324 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); | 1338 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); |
1325 | union_operation_destroy (op); | 1339 | _GSS_operation_destroy (op); |
1326 | return; | 1340 | return; |
1327 | } | 1341 | } |
1328 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); | 1342 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); |
1329 | if (GNUNET_NO == op->client_done_sent) | 1343 | if (GNUNET_NO == op->state->client_done_sent) |
1330 | send_client_done_and_destroy (op); | 1344 | finish_and_destroy (op); |
1331 | } | ||
1332 | |||
1333 | |||
1334 | static void | ||
1335 | union_op_cancel (struct SetState *set_state, uint32_t op_id) | ||
1336 | { | ||
1337 | struct OperationState *op_state; | ||
1338 | int found = GNUNET_NO; | ||
1339 | for (op_state = set_state->ops_head; NULL != op_state; op_state = op_state->next) | ||
1340 | { | ||
1341 | if (op_state->spec->client_request_id == op_id) | ||
1342 | { | ||
1343 | found = GNUNET_YES; | ||
1344 | break; | ||
1345 | } | ||
1346 | } | ||
1347 | if (GNUNET_NO == found) | ||
1348 | { | ||
1349 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "canceling non-existing operation\n"); | ||
1350 | return; | ||
1351 | } | ||
1352 | union_operation_destroy (op_state); | ||
1353 | } | 1345 | } |
1354 | 1346 | ||
1355 | 1347 | ||