diff options
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r-- | src/set/gnunet-service-set_union.c | 2299 |
1 files changed, 1153 insertions, 1146 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 8786807dc..fd7bc24d4 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -16,7 +16,7 @@ | |||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | 16 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | 17 | ||
18 | SPDX-License-Identifier: AGPL3.0-or-later | 18 | SPDX-License-Identifier: AGPL3.0-or-later |
19 | */ | 19 | */ |
20 | /** | 20 | /** |
21 | * @file set/gnunet-service-set_union.c | 21 | * @file set/gnunet-service-set_union.c |
22 | * @brief two-peer set operations | 22 | * @brief two-peer set operations |
@@ -34,7 +34,7 @@ | |||
34 | #include <gcrypt.h> | 34 | #include <gcrypt.h> |
35 | 35 | ||
36 | 36 | ||
37 | #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__) | 37 | #define LOG(kind, ...) GNUNET_log_from(kind, "set-union", __VA_ARGS__) |
38 | 38 | ||
39 | 39 | ||
40 | /** | 40 | /** |
@@ -55,7 +55,7 @@ | |||
55 | /** | 55 | /** |
56 | * Number of buckets that can be transmitted in one message. | 56 | * Number of buckets that can be transmitted in one message. |
57 | */ | 57 | */ |
58 | #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) | 58 | #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) |
59 | 59 | ||
60 | /** | 60 | /** |
61 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | 61 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). |
@@ -74,8 +74,7 @@ | |||
74 | /** | 74 | /** |
75 | * Current phase we are in for a union operation. | 75 | * Current phase we are in for a union operation. |
76 | */ | 76 | */ |
77 | enum UnionOperationPhase | 77 | enum UnionOperationPhase { |
78 | { | ||
79 | /** | 78 | /** |
80 | * We sent the request message, and expect a strata estimator. | 79 | * We sent the request message, and expect a strata estimator. |
81 | */ | 80 | */ |
@@ -139,8 +138,7 @@ enum UnionOperationPhase | |||
139 | /** | 138 | /** |
140 | * State of an evaluate operation with another peer. | 139 | * State of an evaluate operation with another peer. |
141 | */ | 140 | */ |
142 | struct OperationState | 141 | struct OperationState { |
143 | { | ||
144 | /** | 142 | /** |
145 | * Copy of the set's strata estimator at the time of | 143 | * Copy of the set's strata estimator at the time of |
146 | * creation of this operation. | 144 | * creation of this operation. |
@@ -216,8 +214,7 @@ struct OperationState | |||
216 | /** | 214 | /** |
217 | * The key entry is used to associate an ibf key with an element. | 215 | * The key entry is used to associate an ibf key with an element. |
218 | */ | 216 | */ |
219 | struct KeyEntry | 217 | struct KeyEntry { |
220 | { | ||
221 | /** | 218 | /** |
222 | * IBF key for the entry, derived from the current salt. | 219 | * IBF key for the entry, derived from the current salt. |
223 | */ | 220 | */ |
@@ -245,8 +242,7 @@ struct KeyEntry | |||
245 | * Used as a closure for sending elements | 242 | * Used as a closure for sending elements |
246 | * with a specific IBF key. | 243 | * with a specific IBF key. |
247 | */ | 244 | */ |
248 | struct SendElementClosure | 245 | struct SendElementClosure { |
249 | { | ||
250 | /** | 246 | /** |
251 | * The IBF key whose matching elements should be | 247 | * The IBF key whose matching elements should be |
252 | * sent. | 248 | * sent. |
@@ -264,8 +260,7 @@ struct SendElementClosure | |||
264 | /** | 260 | /** |
265 | * Extra state required for efficient set union. | 261 | * Extra state required for efficient set union. |
266 | */ | 262 | */ |
267 | struct SetState | 263 | struct SetState { |
268 | { | ||
269 | /** | 264 | /** |
270 | * The strata estimator is only generated once for | 265 | * The strata estimator is only generated once for |
271 | * each set. | 266 | * each set. |
@@ -287,19 +282,19 @@ struct SetState | |||
287 | * #GNUNET_NO if not. | 282 | * #GNUNET_NO if not. |
288 | */ | 283 | */ |
289 | static int | 284 | static int |
290 | destroy_key_to_element_iter (void *cls, | 285 | destroy_key_to_element_iter(void *cls, |
291 | uint32_t key, | 286 | uint32_t key, |
292 | void *value) | 287 | void *value) |
293 | { | 288 | { |
294 | struct KeyEntry *k = value; | 289 | struct KeyEntry *k = value; |
295 | 290 | ||
296 | GNUNET_assert (NULL != k); | 291 | GNUNET_assert(NULL != k); |
297 | if (GNUNET_YES == k->element->remote) | 292 | if (GNUNET_YES == k->element->remote) |
298 | { | 293 | { |
299 | GNUNET_free (k->element); | 294 | GNUNET_free(k->element); |
300 | k->element = NULL; | 295 | k->element = NULL; |
301 | } | 296 | } |
302 | GNUNET_free (k); | 297 | GNUNET_free(k); |
303 | return GNUNET_YES; | 298 | return GNUNET_YES; |
304 | } | 299 | } |
305 | 300 | ||
@@ -311,44 +306,44 @@ destroy_key_to_element_iter (void *cls, | |||
311 | * @param op union operation to destroy | 306 | * @param op union operation to destroy |
312 | */ | 307 | */ |
313 | static void | 308 | static void |
314 | union_op_cancel (struct Operation *op) | 309 | union_op_cancel(struct Operation *op) |
315 | { | 310 | { |
316 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 311 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
317 | "destroying union op\n"); | 312 | "destroying union op\n"); |
318 | /* check if the op was canceled twice */ | 313 | /* check if the op was canceled twice */ |
319 | GNUNET_assert (NULL != op->state); | 314 | GNUNET_assert(NULL != op->state); |
320 | if (NULL != op->state->remote_ibf) | 315 | if (NULL != op->state->remote_ibf) |
321 | { | 316 | { |
322 | ibf_destroy (op->state->remote_ibf); | 317 | ibf_destroy(op->state->remote_ibf); |
323 | op->state->remote_ibf = NULL; | 318 | op->state->remote_ibf = NULL; |
324 | } | 319 | } |
325 | if (NULL != op->state->demanded_hashes) | 320 | if (NULL != op->state->demanded_hashes) |
326 | { | 321 | { |
327 | GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); | 322 | GNUNET_CONTAINER_multihashmap_destroy(op->state->demanded_hashes); |
328 | op->state->demanded_hashes = NULL; | 323 | op->state->demanded_hashes = NULL; |
329 | } | 324 | } |
330 | if (NULL != op->state->local_ibf) | 325 | if (NULL != op->state->local_ibf) |
331 | { | 326 | { |
332 | ibf_destroy (op->state->local_ibf); | 327 | ibf_destroy(op->state->local_ibf); |
333 | op->state->local_ibf = NULL; | 328 | op->state->local_ibf = NULL; |
334 | } | 329 | } |
335 | if (NULL != op->state->se) | 330 | if (NULL != op->state->se) |
336 | { | 331 | { |
337 | strata_estimator_destroy (op->state->se); | 332 | strata_estimator_destroy(op->state->se); |
338 | op->state->se = NULL; | 333 | op->state->se = NULL; |
339 | } | 334 | } |
340 | if (NULL != op->state->key_to_element) | 335 | if (NULL != op->state->key_to_element) |
341 | { | 336 | { |
342 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | 337 | GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, |
343 | &destroy_key_to_element_iter, | 338 | &destroy_key_to_element_iter, |
344 | NULL); | 339 | NULL); |
345 | GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); | 340 | GNUNET_CONTAINER_multihashmap32_destroy(op->state->key_to_element); |
346 | op->state->key_to_element = NULL; | 341 | op->state->key_to_element = NULL; |
347 | } | 342 | } |
348 | GNUNET_free (op->state); | 343 | GNUNET_free(op->state); |
349 | op->state = NULL; | 344 | op->state = NULL; |
350 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 345 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
351 | "destroying union op done\n"); | 346 | "destroying union op done\n"); |
352 | } | 347 | } |
353 | 348 | ||
354 | 349 | ||
@@ -359,20 +354,20 @@ union_op_cancel (struct Operation *op) | |||
359 | * @param op the union operation to fail | 354 | * @param op the union operation to fail |
360 | */ | 355 | */ |
361 | static void | 356 | static void |
362 | fail_union_operation (struct Operation *op) | 357 | fail_union_operation(struct Operation *op) |
363 | { | 358 | { |
364 | struct GNUNET_MQ_Envelope *ev; | 359 | struct GNUNET_MQ_Envelope *ev; |
365 | struct GNUNET_SET_ResultMessage *msg; | 360 | struct GNUNET_SET_ResultMessage *msg; |
366 | 361 | ||
367 | LOG (GNUNET_ERROR_TYPE_WARNING, | 362 | LOG(GNUNET_ERROR_TYPE_WARNING, |
368 | "union operation failed\n"); | 363 | "union operation failed\n"); |
369 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 364 | ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
370 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 365 | msg->result_status = htons(GNUNET_SET_STATUS_FAILURE); |
371 | msg->request_id = htonl (op->client_request_id); | 366 | msg->request_id = htonl(op->client_request_id); |
372 | msg->element_type = htons (0); | 367 | msg->element_type = htons(0); |
373 | GNUNET_MQ_send (op->set->cs->mq, | 368 | GNUNET_MQ_send(op->set->cs->mq, |
374 | ev); | 369 | ev); |
375 | _GSS_operation_destroy (op, GNUNET_YES); | 370 | _GSS_operation_destroy(op, GNUNET_YES); |
376 | } | 371 | } |
377 | 372 | ||
378 | 373 | ||
@@ -384,16 +379,16 @@ fail_union_operation (struct Operation *op) | |||
384 | * @return the derived IBF key | 379 | * @return the derived IBF key |
385 | */ | 380 | */ |
386 | static struct IBF_Key | 381 | static struct IBF_Key |
387 | get_ibf_key (const struct GNUNET_HashCode *src) | 382 | get_ibf_key(const struct GNUNET_HashCode *src) |
388 | { | 383 | { |
389 | struct IBF_Key key; | 384 | struct IBF_Key key; |
390 | uint16_t salt = 0; | 385 | uint16_t salt = 0; |
391 | 386 | ||
392 | GNUNET_assert (GNUNET_OK == | 387 | GNUNET_assert(GNUNET_OK == |
393 | GNUNET_CRYPTO_kdf (&key, sizeof (key), | 388 | GNUNET_CRYPTO_kdf(&key, sizeof(key), |
394 | src, sizeof *src, | 389 | src, sizeof *src, |
395 | &salt, sizeof (salt), | 390 | &salt, sizeof(salt), |
396 | NULL, 0)); | 391 | NULL, 0)); |
397 | return key; | 392 | return key; |
398 | } | 393 | } |
399 | 394 | ||
@@ -401,8 +396,7 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
401 | /** | 396 | /** |
402 | * Context for #op_get_element_iterator | 397 | * Context for #op_get_element_iterator |
403 | */ | 398 | */ |
404 | struct GetElementContext | 399 | struct GetElementContext { |
405 | { | ||
406 | /** | 400 | /** |
407 | * FIXME. | 401 | * FIXME. |
408 | */ | 402 | */ |
@@ -426,20 +420,20 @@ struct GetElementContext | |||
426 | * #GNUNET_NO if we've found the element. | 420 | * #GNUNET_NO if we've found the element. |
427 | */ | 421 | */ |
428 | static int | 422 | static int |
429 | op_get_element_iterator (void *cls, | 423 | op_get_element_iterator(void *cls, |
430 | uint32_t key, | 424 | uint32_t key, |
431 | void *value) | 425 | void *value) |
432 | { | 426 | { |
433 | struct GetElementContext *ctx = cls; | 427 | struct GetElementContext *ctx = cls; |
434 | struct KeyEntry *k = value; | 428 | struct KeyEntry *k = value; |
435 | 429 | ||
436 | GNUNET_assert (NULL != k); | 430 | GNUNET_assert(NULL != k); |
437 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, | 431 | if (0 == GNUNET_CRYPTO_hash_cmp(&k->element->element_hash, |
438 | &ctx->hash)) | 432 | &ctx->hash)) |
439 | { | 433 | { |
440 | ctx->k = k; | 434 | ctx->k = k; |
441 | return GNUNET_NO; | 435 | return GNUNET_NO; |
442 | } | 436 | } |
443 | return GNUNET_YES; | 437 | return GNUNET_YES; |
444 | } | 438 | } |
445 | 439 | ||
@@ -453,27 +447,27 @@ op_get_element_iterator (void *cls, | |||
453 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise | 447 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise |
454 | */ | 448 | */ |
455 | static struct KeyEntry * | 449 | static struct KeyEntry * |
456 | op_get_element (struct Operation *op, | 450 | op_get_element(struct Operation *op, |
457 | const struct GNUNET_HashCode *element_hash) | 451 | const struct GNUNET_HashCode *element_hash) |
458 | { | 452 | { |
459 | int ret; | 453 | int ret; |
460 | struct IBF_Key ibf_key; | 454 | struct IBF_Key ibf_key; |
461 | struct GetElementContext ctx = {{{ 0 }} , 0}; | 455 | struct GetElementContext ctx = { { { 0 } }, 0 }; |
462 | 456 | ||
463 | ctx.hash = *element_hash; | 457 | ctx.hash = *element_hash; |
464 | 458 | ||
465 | ibf_key = get_ibf_key (element_hash); | 459 | ibf_key = get_ibf_key(element_hash); |
466 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | 460 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element, |
467 | (uint32_t) ibf_key.key_val, | 461 | (uint32_t)ibf_key.key_val, |
468 | op_get_element_iterator, | 462 | op_get_element_iterator, |
469 | &ctx); | 463 | &ctx); |
470 | 464 | ||
471 | /* was the iteration aborted because we found the element? */ | 465 | /* was the iteration aborted because we found the element? */ |
472 | if (GNUNET_SYSERR == ret) | 466 | if (GNUNET_SYSERR == ret) |
473 | { | 467 | { |
474 | GNUNET_assert (NULL != ctx.k); | 468 | GNUNET_assert(NULL != ctx.k); |
475 | return ctx.k; | 469 | return ctx.k; |
476 | } | 470 | } |
477 | return NULL; | 471 | return NULL; |
478 | } | 472 | } |
479 | 473 | ||
@@ -493,23 +487,23 @@ op_get_element (struct Operation *op, | |||
493 | * @parem received was this element received from the remote peer? | 487 | * @parem received was this element received from the remote peer? |
494 | */ | 488 | */ |
495 | static void | 489 | static void |
496 | op_register_element (struct Operation *op, | 490 | op_register_element(struct Operation *op, |
497 | struct ElementEntry *ee, | 491 | struct ElementEntry *ee, |
498 | int received) | 492 | int received) |
499 | { | 493 | { |
500 | struct IBF_Key ibf_key; | 494 | struct IBF_Key ibf_key; |
501 | struct KeyEntry *k; | 495 | struct KeyEntry *k; |
502 | 496 | ||
503 | ibf_key = get_ibf_key (&ee->element_hash); | 497 | ibf_key = get_ibf_key(&ee->element_hash); |
504 | k = GNUNET_new (struct KeyEntry); | 498 | k = GNUNET_new(struct KeyEntry); |
505 | k->element = ee; | 499 | k->element = ee; |
506 | k->ibf_key = ibf_key; | 500 | k->ibf_key = ibf_key; |
507 | k->received = received; | 501 | k->received = received; |
508 | GNUNET_assert (GNUNET_OK == | 502 | GNUNET_assert(GNUNET_OK == |
509 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, | 503 | GNUNET_CONTAINER_multihashmap32_put(op->state->key_to_element, |
510 | (uint32_t) ibf_key.key_val, | 504 | (uint32_t)ibf_key.key_val, |
511 | k, | 505 | k, |
512 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 506 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
513 | } | 507 | } |
514 | 508 | ||
515 | 509 | ||
@@ -517,12 +511,13 @@ op_register_element (struct Operation *op, | |||
517 | * FIXME. | 511 | * FIXME. |
518 | */ | 512 | */ |
519 | static void | 513 | static void |
520 | salt_key (const struct IBF_Key *k_in, | 514 | salt_key(const struct IBF_Key *k_in, |
521 | uint32_t salt, | 515 | uint32_t salt, |
522 | struct IBF_Key *k_out) | 516 | struct IBF_Key *k_out) |
523 | { | 517 | { |
524 | int s = salt % 64; | 518 | int s = salt % 64; |
525 | uint64_t x = k_in->key_val; | 519 | uint64_t x = k_in->key_val; |
520 | |||
526 | /* rotate ibf key */ | 521 | /* rotate ibf key */ |
527 | x = (x >> s) | (x << (64 - s)); | 522 | x = (x >> s) | (x << (64 - s)); |
528 | k_out->key_val = x; | 523 | k_out->key_val = x; |
@@ -533,12 +528,13 @@ salt_key (const struct IBF_Key *k_in, | |||
533 | * FIXME. | 528 | * FIXME. |
534 | */ | 529 | */ |
535 | static void | 530 | static void |
536 | unsalt_key (const struct IBF_Key *k_in, | 531 | unsalt_key(const struct IBF_Key *k_in, |
537 | uint32_t salt, | 532 | uint32_t salt, |
538 | struct IBF_Key *k_out) | 533 | struct IBF_Key *k_out) |
539 | { | 534 | { |
540 | int s = salt % 64; | 535 | int s = salt % 64; |
541 | uint64_t x = k_in->key_val; | 536 | uint64_t x = k_in->key_val; |
537 | |||
542 | x = (x << s) | (x >> (64 - s)); | 538 | x = (x << s) | (x >> (64 - s)); |
543 | k_out->key_val = x; | 539 | k_out->key_val = x; |
544 | } | 540 | } |
@@ -552,23 +548,23 @@ unsalt_key (const struct IBF_Key *k_in, | |||
552 | * @param value the key entry to get the key from | 548 | * @param value the key entry to get the key from |
553 | */ | 549 | */ |
554 | static int | 550 | static int |
555 | prepare_ibf_iterator (void *cls, | 551 | prepare_ibf_iterator(void *cls, |
556 | uint32_t key, | 552 | uint32_t key, |
557 | void *value) | 553 | void *value) |
558 | { | 554 | { |
559 | struct Operation *op = cls; | 555 | struct Operation *op = cls; |
560 | struct KeyEntry *ke = value; | 556 | struct KeyEntry *ke = value; |
561 | struct IBF_Key salted_key; | 557 | struct IBF_Key salted_key; |
562 | 558 | ||
563 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 559 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
564 | "[OP %x] inserting %lx (hash %s) into ibf\n", | 560 | "[OP %x] inserting %lx (hash %s) into ibf\n", |
565 | (void *) op, | 561 | (void *)op, |
566 | (unsigned long) ke->ibf_key.key_val, | 562 | (unsigned long)ke->ibf_key.key_val, |
567 | GNUNET_h2s (&ke->element->element_hash)); | 563 | GNUNET_h2s(&ke->element->element_hash)); |
568 | salt_key (&ke->ibf_key, | 564 | salt_key(&ke->ibf_key, |
569 | op->state->salt_send, | 565 | op->state->salt_send, |
570 | &salted_key); | 566 | &salted_key); |
571 | ibf_insert (op->state->local_ibf, salted_key); | 567 | ibf_insert(op->state->local_ibf, salted_key); |
572 | return GNUNET_YES; | 568 | return GNUNET_YES; |
573 | } | 569 | } |
574 | 570 | ||
@@ -584,9 +580,9 @@ prepare_ibf_iterator (void *cls, | |||
584 | * @return #GNUNET_YES (to continue iterating) | 580 | * @return #GNUNET_YES (to continue iterating) |
585 | */ | 581 | */ |
586 | static int | 582 | static int |
587 | init_key_to_element_iterator (void *cls, | 583 | init_key_to_element_iterator(void *cls, |
588 | const struct GNUNET_HashCode *key, | 584 | const struct GNUNET_HashCode *key, |
589 | void *value) | 585 | void *value) |
590 | { | 586 | { |
591 | struct Operation *op = cls; | 587 | struct Operation *op = cls; |
592 | struct ElementEntry *ee = value; | 588 | struct ElementEntry *ee = value; |
@@ -594,13 +590,13 @@ init_key_to_element_iterator (void *cls, | |||
594 | /* make sure that the element belongs to the set at the time | 590 | /* make sure that the element belongs to the set at the time |
595 | * of creating the operation */ | 591 | * of creating the operation */ |
596 | if (GNUNET_NO == | 592 | if (GNUNET_NO == |
597 | _GSS_is_element_of_operation (ee, | 593 | _GSS_is_element_of_operation(ee, |
598 | op)) | 594 | op)) |
599 | return GNUNET_YES; | 595 | return GNUNET_YES; |
600 | GNUNET_assert (GNUNET_NO == ee->remote); | 596 | GNUNET_assert(GNUNET_NO == ee->remote); |
601 | op_register_element (op, | 597 | op_register_element(op, |
602 | ee, | 598 | ee, |
603 | GNUNET_NO); | 599 | GNUNET_NO); |
604 | return GNUNET_YES; | 600 | return GNUNET_YES; |
605 | } | 601 | } |
606 | 602 | ||
@@ -612,16 +608,16 @@ init_key_to_element_iterator (void *cls, | |||
612 | * @param op the set union operation | 608 | * @param op the set union operation |
613 | */ | 609 | */ |
614 | static void | 610 | static void |
615 | initialize_key_to_element (struct Operation *op) | 611 | initialize_key_to_element(struct Operation *op) |
616 | { | 612 | { |
617 | unsigned int len; | 613 | unsigned int len; |
618 | 614 | ||
619 | GNUNET_assert (NULL == op->state->key_to_element); | 615 | GNUNET_assert(NULL == op->state->key_to_element); |
620 | len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements); | 616 | len = GNUNET_CONTAINER_multihashmap_size(op->set->content->elements); |
621 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | 617 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create(len + 1); |
622 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | 618 | GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements, |
623 | &init_key_to_element_iterator, | 619 | &init_key_to_element_iterator, |
624 | op); | 620 | op); |
625 | } | 621 | } |
626 | 622 | ||
627 | 623 | ||
@@ -634,23 +630,23 @@ initialize_key_to_element (struct Operation *op) | |||
634 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure | 630 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure |
635 | */ | 631 | */ |
636 | static int | 632 | static int |
637 | prepare_ibf (struct Operation *op, | 633 | prepare_ibf(struct Operation *op, |
638 | uint32_t size) | 634 | uint32_t size) |
639 | { | 635 | { |
640 | GNUNET_assert (NULL != op->state->key_to_element); | 636 | GNUNET_assert(NULL != op->state->key_to_element); |
641 | 637 | ||
642 | if (NULL != op->state->local_ibf) | 638 | if (NULL != op->state->local_ibf) |
643 | ibf_destroy (op->state->local_ibf); | 639 | ibf_destroy(op->state->local_ibf); |
644 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 640 | op->state->local_ibf = ibf_create(size, SE_IBF_HASH_NUM); |
645 | if (NULL == op->state->local_ibf) | 641 | if (NULL == op->state->local_ibf) |
646 | { | 642 | { |
647 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 643 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, |
648 | "Failed to allocate local IBF\n"); | 644 | "Failed to allocate local IBF\n"); |
649 | return GNUNET_SYSERR; | 645 | return GNUNET_SYSERR; |
650 | } | 646 | } |
651 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | 647 | GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, |
652 | &prepare_ibf_iterator, | 648 | &prepare_ibf_iterator, |
653 | op); | 649 | op); |
654 | return GNUNET_OK; | 650 | return GNUNET_OK; |
655 | } | 651 | } |
656 | 652 | ||
@@ -665,60 +661,60 @@ prepare_ibf (struct Operation *op, | |||
665 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure | 661 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure |
666 | */ | 662 | */ |
667 | static int | 663 | static int |
668 | send_ibf (struct Operation *op, | 664 | send_ibf(struct Operation *op, |
669 | uint16_t ibf_order) | 665 | uint16_t ibf_order) |
670 | { | 666 | { |
671 | unsigned int buckets_sent = 0; | 667 | unsigned int buckets_sent = 0; |
672 | struct InvertibleBloomFilter *ibf; | 668 | struct InvertibleBloomFilter *ibf; |
673 | 669 | ||
674 | if (GNUNET_OK != | 670 | if (GNUNET_OK != |
675 | prepare_ibf (op, 1<<ibf_order)) | 671 | prepare_ibf(op, 1 << ibf_order)) |
676 | { | 672 | { |
677 | /* allocation failed */ | 673 | /* allocation failed */ |
678 | return GNUNET_SYSERR; | 674 | return GNUNET_SYSERR; |
679 | } | 675 | } |
680 | 676 | ||
681 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 677 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
682 | "sending ibf of size %u\n", | 678 | "sending ibf of size %u\n", |
683 | 1<<ibf_order); | 679 | 1 << ibf_order); |
684 | 680 | ||
685 | { | 681 | { |
686 | char name[64] = { 0 }; | 682 | char name[64] = { 0 }; |
687 | snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order); | 683 | snprintf(name, sizeof(name), "# sent IBF (order %u)", ibf_order); |
688 | GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); | 684 | GNUNET_STATISTICS_update(_GSS_statistics, name, 1, GNUNET_NO); |
689 | } | 685 | } |
690 | 686 | ||
691 | ibf = op->state->local_ibf; | 687 | ibf = op->state->local_ibf; |
692 | 688 | ||
693 | while (buckets_sent < (1 << ibf_order)) | 689 | while (buckets_sent < (1 << ibf_order)) |
694 | { | 690 | { |
695 | unsigned int buckets_in_message; | 691 | unsigned int buckets_in_message; |
696 | struct GNUNET_MQ_Envelope *ev; | 692 | struct GNUNET_MQ_Envelope *ev; |
697 | struct IBFMessage *msg; | 693 | struct IBFMessage *msg; |
698 | 694 | ||
699 | buckets_in_message = (1 << ibf_order) - buckets_sent; | 695 | buckets_in_message = (1 << ibf_order) - buckets_sent; |
700 | /* limit to maximum */ | 696 | /* limit to maximum */ |
701 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) | 697 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) |
702 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | 698 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; |
703 | 699 | ||
704 | ev = GNUNET_MQ_msg_extra (msg, | 700 | ev = GNUNET_MQ_msg_extra(msg, |
705 | buckets_in_message * IBF_BUCKET_SIZE, | 701 | buckets_in_message * IBF_BUCKET_SIZE, |
706 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); | 702 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); |
707 | msg->reserved1 = 0; | 703 | msg->reserved1 = 0; |
708 | msg->reserved2 = 0; | 704 | msg->reserved2 = 0; |
709 | msg->order = ibf_order; | 705 | msg->order = ibf_order; |
710 | msg->offset = htonl (buckets_sent); | 706 | msg->offset = htonl(buckets_sent); |
711 | msg->salt = htonl (op->state->salt_send); | 707 | msg->salt = htonl(op->state->salt_send); |
712 | ibf_write_slice (ibf, buckets_sent, | 708 | ibf_write_slice(ibf, buckets_sent, |
713 | buckets_in_message, &msg[1]); | 709 | buckets_in_message, &msg[1]); |
714 | buckets_sent += buckets_in_message; | 710 | buckets_sent += buckets_in_message; |
715 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 711 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
716 | "ibf chunk size %u, %u/%u sent\n", | 712 | "ibf chunk size %u, %u/%u sent\n", |
717 | buckets_in_message, | 713 | buckets_in_message, |
718 | buckets_sent, | 714 | buckets_sent, |
719 | 1<<ibf_order); | 715 | 1 << ibf_order); |
720 | GNUNET_MQ_send (op->mq, ev); | 716 | GNUNET_MQ_send(op->mq, ev); |
721 | } | 717 | } |
722 | 718 | ||
723 | /* The other peer must decode the IBF, so | 719 | /* The other peer must decode the IBF, so |
724 | * we're passive. */ | 720 | * we're passive. */ |
@@ -735,14 +731,14 @@ send_ibf (struct Operation *op, | |||
735 | * @return the required size of the ibf | 731 | * @return the required size of the ibf |
736 | */ | 732 | */ |
737 | static unsigned int | 733 | static unsigned int |
738 | get_order_from_difference (unsigned int diff) | 734 | get_order_from_difference(unsigned int diff) |
739 | { | 735 | { |
740 | unsigned int ibf_order; | 736 | unsigned int ibf_order; |
741 | 737 | ||
742 | ibf_order = 2; | 738 | ibf_order = 2; |
743 | while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) || | 739 | while (((1 << ibf_order) < (IBF_ALPHA * diff) || |
744 | ((1<<ibf_order) < SE_IBF_HASH_NUM) ) && | 740 | ((1 << ibf_order) < SE_IBF_HASH_NUM)) && |
745 | (ibf_order < MAX_IBF_ORDER) ) | 741 | (ibf_order < MAX_IBF_ORDER)) |
746 | ibf_order++; | 742 | ibf_order++; |
747 | // add one for correction | 743 | // add one for correction |
748 | return ibf_order + 1; | 744 | return ibf_order + 1; |
@@ -759,9 +755,9 @@ get_order_from_difference (unsigned int diff) | |||
759 | * @return #GNUNET_YES (to continue iterating) | 755 | * @return #GNUNET_YES (to continue iterating) |
760 | */ | 756 | */ |
761 | static int | 757 | static int |
762 | send_full_element_iterator (void *cls, | 758 | send_full_element_iterator(void *cls, |
763 | const struct GNUNET_HashCode *key, | 759 | const struct GNUNET_HashCode *key, |
764 | void *value) | 760 | void *value) |
765 | { | 761 | { |
766 | struct Operation *op = cls; | 762 | struct Operation *op = cls; |
767 | struct GNUNET_SET_ElementMessage *emsg; | 763 | struct GNUNET_SET_ElementMessage *emsg; |
@@ -769,18 +765,18 @@ send_full_element_iterator (void *cls, | |||
769 | struct GNUNET_SET_Element *el = &ee->element; | 765 | struct GNUNET_SET_Element *el = &ee->element; |
770 | struct GNUNET_MQ_Envelope *ev; | 766 | struct GNUNET_MQ_Envelope *ev; |
771 | 767 | ||
772 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 768 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
773 | "Sending element %s\n", | 769 | "Sending element %s\n", |
774 | GNUNET_h2s (key)); | 770 | GNUNET_h2s(key)); |
775 | ev = GNUNET_MQ_msg_extra (emsg, | 771 | ev = GNUNET_MQ_msg_extra(emsg, |
776 | el->size, | 772 | el->size, |
777 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | 773 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); |
778 | emsg->element_type = htons (el->element_type); | 774 | emsg->element_type = htons(el->element_type); |
779 | GNUNET_memcpy (&emsg[1], | 775 | GNUNET_memcpy(&emsg[1], |
780 | el->data, | 776 | el->data, |
781 | el->size); | 777 | el->size); |
782 | GNUNET_MQ_send (op->mq, | 778 | GNUNET_MQ_send(op->mq, |
783 | ev); | 779 | ev); |
784 | return GNUNET_YES; | 780 | return GNUNET_YES; |
785 | } | 781 | } |
786 | 782 | ||
@@ -791,21 +787,21 @@ send_full_element_iterator (void *cls, | |||
791 | * @param op operation to switch to full set transmission. | 787 | * @param op operation to switch to full set transmission. |
792 | */ | 788 | */ |
793 | static void | 789 | static void |
794 | send_full_set (struct Operation *op) | 790 | send_full_set(struct Operation *op) |
795 | { | 791 | { |
796 | struct GNUNET_MQ_Envelope *ev; | 792 | struct GNUNET_MQ_Envelope *ev; |
797 | 793 | ||
798 | op->state->phase = PHASE_FULL_SENDING; | 794 | op->state->phase = PHASE_FULL_SENDING; |
799 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 795 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
800 | "Dedicing to transmit the full set\n"); | 796 | "Dedicing to transmit the full set\n"); |
801 | /* FIXME: use a more memory-friendly way of doing this with an | 797 | /* FIXME: use a more memory-friendly way of doing this with an |
802 | iterator, just as we do in the non-full case! */ | 798 | iterator, just as we do in the non-full case! */ |
803 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | 799 | (void)GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements, |
804 | &send_full_element_iterator, | 800 | &send_full_element_iterator, |
805 | op); | 801 | op); |
806 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | 802 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); |
807 | GNUNET_MQ_send (op->mq, | 803 | GNUNET_MQ_send(op->mq, |
808 | ev); | 804 | ev); |
809 | } | 805 | } |
810 | 806 | ||
811 | 807 | ||
@@ -816,26 +812,26 @@ send_full_set (struct Operation *op) | |||
816 | * @param msg the message | 812 | * @param msg the message |
817 | */ | 813 | */ |
818 | int | 814 | int |
819 | check_union_p2p_strata_estimator (void *cls, | 815 | check_union_p2p_strata_estimator(void *cls, |
820 | const struct StrataEstimatorMessage *msg) | 816 | const struct StrataEstimatorMessage *msg) |
821 | { | 817 | { |
822 | struct Operation *op = cls; | 818 | struct Operation *op = cls; |
823 | int is_compressed; | 819 | int is_compressed; |
824 | size_t len; | 820 | size_t len; |
825 | 821 | ||
826 | if (op->state->phase != PHASE_EXPECT_SE) | 822 | if (op->state->phase != PHASE_EXPECT_SE) |
827 | { | 823 | { |
828 | GNUNET_break (0); | 824 | GNUNET_break(0); |
829 | return GNUNET_SYSERR; | 825 | return GNUNET_SYSERR; |
830 | } | 826 | } |
831 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type)); | 827 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type)); |
832 | len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage); | 828 | len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage); |
833 | if ( (GNUNET_NO == is_compressed) && | 829 | if ((GNUNET_NO == is_compressed) && |
834 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) | 830 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) |
835 | { | 831 | { |
836 | GNUNET_break (0); | 832 | GNUNET_break(0); |
837 | return GNUNET_SYSERR; | 833 | return GNUNET_SYSERR; |
838 | } | 834 | } |
839 | return GNUNET_OK; | 835 | return GNUNET_OK; |
840 | } | 836 | } |
841 | 837 | ||
@@ -847,8 +843,8 @@ check_union_p2p_strata_estimator (void *cls, | |||
847 | * @param msg the message | 843 | * @param msg the message |
848 | */ | 844 | */ |
849 | void | 845 | void |
850 | handle_union_p2p_strata_estimator (void *cls, | 846 | handle_union_p2p_strata_estimator(void *cls, |
851 | const struct StrataEstimatorMessage *msg) | 847 | const struct StrataEstimatorMessage *msg) |
852 | { | 848 | { |
853 | struct Operation *op = cls; | 849 | struct Operation *op = cls; |
854 | struct StrataEstimator *remote_se; | 850 | struct StrataEstimator *remote_se; |
@@ -857,116 +853,116 @@ handle_union_p2p_strata_estimator (void *cls, | |||
857 | size_t len; | 853 | size_t len; |
858 | int is_compressed; | 854 | int is_compressed; |
859 | 855 | ||
860 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type)); | 856 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type)); |
861 | GNUNET_STATISTICS_update (_GSS_statistics, | 857 | GNUNET_STATISTICS_update(_GSS_statistics, |
862 | "# bytes of SE received", | 858 | "# bytes of SE received", |
863 | ntohs (msg->header.size), | 859 | ntohs(msg->header.size), |
864 | GNUNET_NO); | 860 | GNUNET_NO); |
865 | len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage); | 861 | len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage); |
866 | other_size = GNUNET_ntohll (msg->set_size); | 862 | other_size = GNUNET_ntohll(msg->set_size); |
867 | remote_se = strata_estimator_create (SE_STRATA_COUNT, | 863 | remote_se = strata_estimator_create(SE_STRATA_COUNT, |
868 | SE_IBF_SIZE, | 864 | SE_IBF_SIZE, |
869 | SE_IBF_HASH_NUM); | 865 | SE_IBF_HASH_NUM); |
870 | if (NULL == remote_se) | 866 | if (NULL == remote_se) |
871 | { | 867 | { |
872 | /* insufficient resources, fail */ | 868 | /* insufficient resources, fail */ |
873 | fail_union_operation (op); | 869 | fail_union_operation(op); |
874 | return; | 870 | return; |
875 | } | 871 | } |
876 | if (GNUNET_OK != | 872 | if (GNUNET_OK != |
877 | strata_estimator_read (&msg[1], | 873 | strata_estimator_read(&msg[1], |
878 | len, | 874 | len, |
879 | is_compressed, | 875 | is_compressed, |
880 | remote_se)) | 876 | remote_se)) |
881 | { | 877 | { |
882 | /* decompression failed */ | 878 | /* decompression failed */ |
883 | strata_estimator_destroy (remote_se); | 879 | strata_estimator_destroy(remote_se); |
884 | fail_union_operation (op); | 880 | fail_union_operation(op); |
885 | return; | 881 | return; |
886 | } | 882 | } |
887 | GNUNET_assert (NULL != op->state->se); | 883 | GNUNET_assert(NULL != op->state->se); |
888 | diff = strata_estimator_difference (remote_se, | 884 | diff = strata_estimator_difference(remote_se, |
889 | op->state->se); | 885 | op->state->se); |
890 | 886 | ||
891 | if (diff > 200) | 887 | if (diff > 200) |
892 | diff = diff * 3 / 2; | 888 | diff = diff * 3 / 2; |
893 | 889 | ||
894 | strata_estimator_destroy (remote_se); | 890 | strata_estimator_destroy(remote_se); |
895 | strata_estimator_destroy (op->state->se); | 891 | strata_estimator_destroy(op->state->se); |
896 | op->state->se = NULL; | 892 | op->state->se = NULL; |
897 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 893 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
898 | "got se diff=%d, using ibf size %d\n", | 894 | "got se diff=%d, using ibf size %d\n", |
899 | diff, | 895 | diff, |
900 | 1U << get_order_from_difference (diff)); | 896 | 1U << get_order_from_difference(diff)); |
901 | 897 | ||
902 | { | 898 | { |
903 | char *set_debug; | 899 | char *set_debug; |
904 | 900 | ||
905 | set_debug = getenv ("GNUNET_SET_BENCHMARK"); | 901 | set_debug = getenv("GNUNET_SET_BENCHMARK"); |
906 | if ( (NULL != set_debug) && | 902 | if ((NULL != set_debug) && |
907 | (0 == strcmp (set_debug, "1")) ) | 903 | (0 == strcmp(set_debug, "1"))) |
908 | { | 904 | { |
909 | FILE *f = fopen ("set.log", "a"); | 905 | FILE *f = fopen("set.log", "a"); |
910 | fprintf (f, "%llu\n", (unsigned long long) diff); | 906 | fprintf(f, "%llu\n", (unsigned long long)diff); |
911 | fclose (f); | 907 | fclose(f); |
912 | } | 908 | } |
913 | } | ||
914 | |||
915 | if ( (GNUNET_YES == op->byzantine) && | ||
916 | (other_size < op->byzantine_lower_bound) ) | ||
917 | { | ||
918 | GNUNET_break (0); | ||
919 | fail_union_operation (op); | ||
920 | return; | ||
921 | } | 909 | } |
922 | 910 | ||
923 | if ( (GNUNET_YES == op->force_full) || | 911 | if ((GNUNET_YES == op->byzantine) && |
924 | (diff > op->state->initial_size / 4) || | 912 | (other_size < op->byzantine_lower_bound)) |
925 | (0 == other_size) ) | ||
926 | { | ||
927 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
928 | "Deciding to go for full set transmission (diff=%d, own set=%u)\n", | ||
929 | diff, | ||
930 | op->state->initial_size); | ||
931 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
932 | "# of full sends", | ||
933 | 1, | ||
934 | GNUNET_NO); | ||
935 | if ( (op->state->initial_size <= other_size) || | ||
936 | (0 == other_size) ) | ||
937 | { | 913 | { |
938 | send_full_set (op); | 914 | GNUNET_break(0); |
915 | fail_union_operation(op); | ||
916 | return; | ||
939 | } | 917 | } |
940 | else | ||
941 | { | ||
942 | struct GNUNET_MQ_Envelope *ev; | ||
943 | 918 | ||
944 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 919 | if ((GNUNET_YES == op->force_full) || |
945 | "Telling other peer that we expect its full set\n"); | 920 | (diff > op->state->initial_size / 4) || |
946 | op->state->phase = PHASE_EXPECT_IBF; | 921 | (0 == other_size)) |
947 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); | 922 | { |
948 | GNUNET_MQ_send (op->mq, | 923 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
949 | ev); | 924 | "Deciding to go for full set transmission (diff=%d, own set=%u)\n", |
925 | diff, | ||
926 | op->state->initial_size); | ||
927 | GNUNET_STATISTICS_update(_GSS_statistics, | ||
928 | "# of full sends", | ||
929 | 1, | ||
930 | GNUNET_NO); | ||
931 | if ((op->state->initial_size <= other_size) || | ||
932 | (0 == other_size)) | ||
933 | { | ||
934 | send_full_set(op); | ||
935 | } | ||
936 | else | ||
937 | { | ||
938 | struct GNUNET_MQ_Envelope *ev; | ||
939 | |||
940 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
941 | "Telling other peer that we expect its full set\n"); | ||
942 | op->state->phase = PHASE_EXPECT_IBF; | ||
943 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); | ||
944 | GNUNET_MQ_send(op->mq, | ||
945 | ev); | ||
946 | } | ||
950 | } | 947 | } |
951 | } | ||
952 | else | 948 | else |
953 | { | 949 | { |
954 | GNUNET_STATISTICS_update (_GSS_statistics, | 950 | GNUNET_STATISTICS_update(_GSS_statistics, |
955 | "# of ibf sends", | 951 | "# of ibf sends", |
956 | 1, | 952 | 1, |
957 | GNUNET_NO); | 953 | GNUNET_NO); |
958 | if (GNUNET_OK != | 954 | if (GNUNET_OK != |
959 | send_ibf (op, | 955 | send_ibf(op, |
960 | get_order_from_difference (diff))) | 956 | get_order_from_difference(diff))) |
961 | { | 957 | { |
962 | /* Internal error, best we can do is shut the connection */ | 958 | /* Internal error, best we can do is shut the connection */ |
963 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 959 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, |
964 | "Failed to send IBF, closing connection\n"); | 960 | "Failed to send IBF, closing connection\n"); |
965 | fail_union_operation (op); | 961 | fail_union_operation(op); |
966 | return; | 962 | return; |
963 | } | ||
967 | } | 964 | } |
968 | } | 965 | GNUNET_CADET_receive_done(op->channel); |
969 | GNUNET_CADET_receive_done (op->channel); | ||
970 | } | 966 | } |
971 | 967 | ||
972 | 968 | ||
@@ -978,9 +974,9 @@ handle_union_p2p_strata_estimator (void *cls, | |||
978 | * @param value the key entry | 974 | * @param value the key entry |
979 | */ | 975 | */ |
980 | static int | 976 | static int |
981 | send_offers_iterator (void *cls, | 977 | send_offers_iterator(void *cls, |
982 | uint32_t key, | 978 | uint32_t key, |
983 | void *value) | 979 | void *value) |
984 | { | 980 | { |
985 | struct SendElementClosure *sec = cls; | 981 | struct SendElementClosure *sec = cls; |
986 | struct Operation *op = sec->op; | 982 | struct Operation *op = sec->op; |
@@ -992,17 +988,17 @@ send_offers_iterator (void *cls, | |||
992 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) | 988 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) |
993 | return GNUNET_YES; | 989 | return GNUNET_YES; |
994 | 990 | ||
995 | ev = GNUNET_MQ_msg_header_extra (mh, | 991 | ev = GNUNET_MQ_msg_header_extra(mh, |
996 | sizeof (struct GNUNET_HashCode), | 992 | sizeof(struct GNUNET_HashCode), |
997 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); | 993 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); |
998 | 994 | ||
999 | GNUNET_assert (NULL != ev); | 995 | GNUNET_assert(NULL != ev); |
1000 | *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; | 996 | *(struct GNUNET_HashCode *)&mh[1] = ke->element->element_hash; |
1001 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 997 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1002 | "[OP %x] sending element offer (%s) to peer\n", | 998 | "[OP %x] sending element offer (%s) to peer\n", |
1003 | (void *) op, | 999 | (void *)op, |
1004 | GNUNET_h2s (&ke->element->element_hash)); | 1000 | GNUNET_h2s(&ke->element->element_hash)); |
1005 | GNUNET_MQ_send (op->mq, ev); | 1001 | GNUNET_MQ_send(op->mq, ev); |
1006 | return GNUNET_YES; | 1002 | return GNUNET_YES; |
1007 | } | 1003 | } |
1008 | 1004 | ||
@@ -1014,17 +1010,17 @@ send_offers_iterator (void *cls, | |||
1014 | * @param ibf_key IBF key of interest | 1010 | * @param ibf_key IBF key of interest |
1015 | */ | 1011 | */ |
1016 | static void | 1012 | static void |
1017 | send_offers_for_key (struct Operation *op, | 1013 | send_offers_for_key(struct Operation *op, |
1018 | struct IBF_Key ibf_key) | 1014 | struct IBF_Key ibf_key) |
1019 | { | 1015 | { |
1020 | struct SendElementClosure send_cls; | 1016 | struct SendElementClosure send_cls; |
1021 | 1017 | ||
1022 | send_cls.ibf_key = ibf_key; | 1018 | send_cls.ibf_key = ibf_key; |
1023 | send_cls.op = op; | 1019 | send_cls.op = op; |
1024 | (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | 1020 | (void)GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element, |
1025 | (uint32_t) ibf_key.key_val, | 1021 | (uint32_t)ibf_key.key_val, |
1026 | &send_offers_iterator, | 1022 | &send_offers_iterator, |
1027 | &send_cls); | 1023 | &send_cls); |
1028 | } | 1024 | } |
1029 | 1025 | ||
1030 | 1026 | ||
@@ -1036,7 +1032,7 @@ send_offers_for_key (struct Operation *op, | |||
1036 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure | 1032 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure |
1037 | */ | 1033 | */ |
1038 | static int | 1034 | static int |
1039 | decode_and_send (struct Operation *op) | 1035 | decode_and_send(struct Operation *op) |
1040 | { | 1036 | { |
1041 | struct IBF_Key key; | 1037 | struct IBF_Key key; |
1042 | struct IBF_Key last_key; | 1038 | struct IBF_Key last_key; |
@@ -1044,147 +1040,147 @@ decode_and_send (struct Operation *op) | |||
1044 | unsigned int num_decoded; | 1040 | unsigned int num_decoded; |
1045 | struct InvertibleBloomFilter *diff_ibf; | 1041 | struct InvertibleBloomFilter *diff_ibf; |
1046 | 1042 | ||
1047 | GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); | 1043 | GNUNET_assert(PHASE_INVENTORY_ACTIVE == op->state->phase); |
1048 | 1044 | ||
1049 | if (GNUNET_OK != | 1045 | if (GNUNET_OK != |
1050 | prepare_ibf (op, | 1046 | prepare_ibf(op, |
1051 | op->state->remote_ibf->size)) | 1047 | op->state->remote_ibf->size)) |
1052 | { | 1048 | { |
1053 | GNUNET_break (0); | 1049 | GNUNET_break(0); |
1054 | /* allocation failed */ | 1050 | /* allocation failed */ |
1055 | return GNUNET_SYSERR; | 1051 | return GNUNET_SYSERR; |
1056 | } | 1052 | } |
1057 | diff_ibf = ibf_dup (op->state->local_ibf); | 1053 | diff_ibf = ibf_dup(op->state->local_ibf); |
1058 | ibf_subtract (diff_ibf, | 1054 | ibf_subtract(diff_ibf, |
1059 | op->state->remote_ibf); | 1055 | op->state->remote_ibf); |
1060 | 1056 | ||
1061 | ibf_destroy (op->state->remote_ibf); | 1057 | ibf_destroy(op->state->remote_ibf); |
1062 | op->state->remote_ibf = NULL; | 1058 | op->state->remote_ibf = NULL; |
1063 | 1059 | ||
1064 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1060 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1065 | "decoding IBF (size=%u)\n", | 1061 | "decoding IBF (size=%u)\n", |
1066 | diff_ibf->size); | 1062 | diff_ibf->size); |
1067 | 1063 | ||
1068 | num_decoded = 0; | 1064 | num_decoded = 0; |
1069 | key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ | 1065 | key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ |
1070 | 1066 | ||
1071 | while (1) | 1067 | while (1) |
1072 | { | 1068 | { |
1073 | int res; | 1069 | int res; |
1074 | int cycle_detected = GNUNET_NO; | 1070 | int cycle_detected = GNUNET_NO; |
1075 | 1071 | ||
1076 | last_key = key; | 1072 | last_key = key; |
1077 | 1073 | ||
1078 | res = ibf_decode (diff_ibf, &side, &key); | 1074 | res = ibf_decode(diff_ibf, &side, &key); |
1079 | if (res == GNUNET_OK) | 1075 | if (res == GNUNET_OK) |
1080 | { | ||
1081 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1082 | "decoded ibf key %lx\n", | ||
1083 | (unsigned long) key.key_val); | ||
1084 | num_decoded += 1; | ||
1085 | if ( (num_decoded > diff_ibf->size) || | ||
1086 | ( (num_decoded > 1) && | ||
1087 | (last_key.key_val == key.key_val) ) ) | ||
1088 | { | ||
1089 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1090 | "detected cyclic ibf (decoded %u/%u)\n", | ||
1091 | num_decoded, | ||
1092 | diff_ibf->size); | ||
1093 | cycle_detected = GNUNET_YES; | ||
1094 | } | ||
1095 | } | ||
1096 | if ( (GNUNET_SYSERR == res) || | ||
1097 | (GNUNET_YES == cycle_detected) ) | ||
1098 | { | ||
1099 | int next_order; | ||
1100 | next_order = 0; | ||
1101 | while (1<<next_order < diff_ibf->size) | ||
1102 | next_order++; | ||
1103 | next_order++; | ||
1104 | if (next_order <= MAX_IBF_ORDER) | ||
1105 | { | ||
1106 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1107 | "decoding failed, sending larger ibf (size %u)\n", | ||
1108 | 1<<next_order); | ||
1109 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1110 | "# of IBF retries", | ||
1111 | 1, | ||
1112 | GNUNET_NO); | ||
1113 | op->state->salt_send++; | ||
1114 | if (GNUNET_OK != | ||
1115 | send_ibf (op, next_order)) | ||
1116 | { | 1076 | { |
1117 | /* Internal error, best we can do is shut the connection */ | 1077 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1118 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1078 | "decoded ibf key %lx\n", |
1119 | "Failed to send IBF, closing connection\n"); | 1079 | (unsigned long)key.key_val); |
1120 | fail_union_operation (op); | 1080 | num_decoded += 1; |
1121 | ibf_destroy (diff_ibf); | 1081 | if ((num_decoded > diff_ibf->size) || |
1122 | return GNUNET_SYSERR; | 1082 | ((num_decoded > 1) && |
1083 | (last_key.key_val == key.key_val))) | ||
1084 | { | ||
1085 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1086 | "detected cyclic ibf (decoded %u/%u)\n", | ||
1087 | num_decoded, | ||
1088 | diff_ibf->size); | ||
1089 | cycle_detected = GNUNET_YES; | ||
1090 | } | ||
1123 | } | 1091 | } |
1124 | } | 1092 | if ((GNUNET_SYSERR == res) || |
1125 | else | 1093 | (GNUNET_YES == cycle_detected)) |
1126 | { | 1094 | { |
1127 | GNUNET_STATISTICS_update (_GSS_statistics, | 1095 | int next_order; |
1128 | "# of failed union operations (too large)", | 1096 | next_order = 0; |
1129 | 1, | 1097 | while (1 << next_order < diff_ibf->size) |
1130 | GNUNET_NO); | 1098 | next_order++; |
1131 | // XXX: Send the whole set, element-by-element | 1099 | next_order++; |
1132 | LOG (GNUNET_ERROR_TYPE_ERROR, | 1100 | if (next_order <= MAX_IBF_ORDER) |
1133 | "set union failed: reached ibf limit\n"); | 1101 | { |
1134 | fail_union_operation (op); | 1102 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1135 | ibf_destroy (diff_ibf); | 1103 | "decoding failed, sending larger ibf (size %u)\n", |
1136 | return GNUNET_SYSERR; | 1104 | 1 << next_order); |
1137 | } | 1105 | GNUNET_STATISTICS_update(_GSS_statistics, |
1138 | break; | 1106 | "# of IBF retries", |
1139 | } | 1107 | 1, |
1140 | if (GNUNET_NO == res) | 1108 | GNUNET_NO); |
1141 | { | 1109 | op->state->salt_send++; |
1142 | struct GNUNET_MQ_Envelope *ev; | 1110 | if (GNUNET_OK != |
1143 | 1111 | send_ibf(op, next_order)) | |
1144 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1112 | { |
1145 | "transmitted all values, sending DONE\n"); | 1113 | /* Internal error, best we can do is shut the connection */ |
1146 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | 1114 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, |
1147 | GNUNET_MQ_send (op->mq, ev); | 1115 | "Failed to send IBF, closing connection\n"); |
1148 | /* We now wait until we get a DONE message back | 1116 | fail_union_operation(op); |
1149 | * and then wait for our MQ to be flushed and all our | 1117 | ibf_destroy(diff_ibf); |
1150 | * demands be delivered. */ | 1118 | return GNUNET_SYSERR; |
1151 | break; | 1119 | } |
1152 | } | 1120 | } |
1153 | if (1 == side) | 1121 | else |
1154 | { | 1122 | { |
1155 | struct IBF_Key unsalted_key; | 1123 | GNUNET_STATISTICS_update(_GSS_statistics, |
1124 | "# of failed union operations (too large)", | ||
1125 | 1, | ||
1126 | GNUNET_NO); | ||
1127 | // XXX: Send the whole set, element-by-element | ||
1128 | LOG(GNUNET_ERROR_TYPE_ERROR, | ||
1129 | "set union failed: reached ibf limit\n"); | ||
1130 | fail_union_operation(op); | ||
1131 | ibf_destroy(diff_ibf); | ||
1132 | return GNUNET_SYSERR; | ||
1133 | } | ||
1134 | break; | ||
1135 | } | ||
1136 | if (GNUNET_NO == res) | ||
1137 | { | ||
1138 | struct GNUNET_MQ_Envelope *ev; | ||
1139 | |||
1140 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1141 | "transmitted all values, sending DONE\n"); | ||
1142 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | ||
1143 | GNUNET_MQ_send(op->mq, ev); | ||
1144 | /* We now wait until we get a DONE message back | ||
1145 | * and then wait for our MQ to be flushed and all our | ||
1146 | * demands be delivered. */ | ||
1147 | break; | ||
1148 | } | ||
1149 | if (1 == side) | ||
1150 | { | ||
1151 | struct IBF_Key unsalted_key; | ||
1156 | 1152 | ||
1157 | unsalt_key (&key, | 1153 | unsalt_key(&key, |
1158 | op->state->salt_receive, | 1154 | op->state->salt_receive, |
1159 | &unsalted_key); | 1155 | &unsalted_key); |
1160 | send_offers_for_key (op, | 1156 | send_offers_for_key(op, |
1161 | unsalted_key); | 1157 | unsalted_key); |
1162 | } | 1158 | } |
1163 | else if (-1 == side) | 1159 | else if (-1 == side) |
1164 | { | 1160 | { |
1165 | struct GNUNET_MQ_Envelope *ev; | 1161 | struct GNUNET_MQ_Envelope *ev; |
1166 | struct InquiryMessage *msg; | 1162 | struct InquiryMessage *msg; |
1167 | 1163 | ||
1168 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth | 1164 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth |
1169 | * the effort additional complexity. */ | 1165 | * the effort additional complexity. */ |
1170 | ev = GNUNET_MQ_msg_extra (msg, | 1166 | ev = GNUNET_MQ_msg_extra(msg, |
1171 | sizeof (struct IBF_Key), | 1167 | sizeof(struct IBF_Key), |
1172 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); | 1168 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); |
1173 | msg->salt = htonl (op->state->salt_receive); | 1169 | msg->salt = htonl(op->state->salt_receive); |
1174 | GNUNET_memcpy (&msg[1], | 1170 | GNUNET_memcpy(&msg[1], |
1175 | &key, | 1171 | &key, |
1176 | sizeof (struct IBF_Key)); | 1172 | sizeof(struct IBF_Key)); |
1177 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1173 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1178 | "sending element inquiry for IBF key %lx\n", | 1174 | "sending element inquiry for IBF key %lx\n", |
1179 | (unsigned long) key.key_val); | 1175 | (unsigned long)key.key_val); |
1180 | GNUNET_MQ_send (op->mq, ev); | 1176 | GNUNET_MQ_send(op->mq, ev); |
1181 | } | 1177 | } |
1182 | else | 1178 | else |
1183 | { | 1179 | { |
1184 | GNUNET_assert (0); | 1180 | GNUNET_assert(0); |
1181 | } | ||
1185 | } | 1182 | } |
1186 | } | 1183 | ibf_destroy(diff_ibf); |
1187 | ibf_destroy (diff_ibf); | ||
1188 | return GNUNET_OK; | 1184 | return GNUNET_OK; |
1189 | } | 1185 | } |
1190 | 1186 | ||
@@ -1200,52 +1196,52 @@ decode_and_send (struct Operation *op) | |||
1200 | * @return #GNUNET_OK if @a msg is well-formed | 1196 | * @return #GNUNET_OK if @a msg is well-formed |
1201 | */ | 1197 | */ |
1202 | int | 1198 | int |
1203 | check_union_p2p_ibf (void *cls, | 1199 | check_union_p2p_ibf(void *cls, |
1204 | const struct IBFMessage *msg) | 1200 | const struct IBFMessage *msg) |
1205 | { | 1201 | { |
1206 | struct Operation *op = cls; | 1202 | struct Operation *op = cls; |
1207 | unsigned int buckets_in_message; | 1203 | unsigned int buckets_in_message; |
1208 | 1204 | ||
1209 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1205 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1210 | { | 1206 | { |
1211 | GNUNET_break_op (0); | 1207 | GNUNET_break_op(0); |
1212 | return GNUNET_SYSERR; | 1208 | return GNUNET_SYSERR; |
1213 | } | 1209 | } |
1214 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | 1210 | buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; |
1215 | if (0 == buckets_in_message) | 1211 | if (0 == buckets_in_message) |
1216 | { | ||
1217 | GNUNET_break_op (0); | ||
1218 | return GNUNET_SYSERR; | ||
1219 | } | ||
1220 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) | ||
1221 | { | ||
1222 | GNUNET_break_op (0); | ||
1223 | return GNUNET_SYSERR; | ||
1224 | } | ||
1225 | if (op->state->phase == PHASE_EXPECT_IBF_CONT) | ||
1226 | { | ||
1227 | if (ntohl (msg->offset) != op->state->ibf_buckets_received) | ||
1228 | { | 1212 | { |
1229 | GNUNET_break_op (0); | 1213 | GNUNET_break_op(0); |
1230 | return GNUNET_SYSERR; | 1214 | return GNUNET_SYSERR; |
1231 | } | 1215 | } |
1232 | if (1<<msg->order != op->state->remote_ibf->size) | 1216 | if ((ntohs(msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) |
1233 | { | 1217 | { |
1234 | GNUNET_break_op (0); | 1218 | GNUNET_break_op(0); |
1235 | return GNUNET_SYSERR; | 1219 | return GNUNET_SYSERR; |
1236 | } | 1220 | } |
1237 | if (ntohl (msg->salt) != op->state->salt_receive) | 1221 | if (op->state->phase == PHASE_EXPECT_IBF_CONT) |
1238 | { | 1222 | { |
1239 | GNUNET_break_op (0); | 1223 | if (ntohl(msg->offset) != op->state->ibf_buckets_received) |
1224 | { | ||
1225 | GNUNET_break_op(0); | ||
1226 | return GNUNET_SYSERR; | ||
1227 | } | ||
1228 | if (1 << msg->order != op->state->remote_ibf->size) | ||
1229 | { | ||
1230 | GNUNET_break_op(0); | ||
1231 | return GNUNET_SYSERR; | ||
1232 | } | ||
1233 | if (ntohl(msg->salt) != op->state->salt_receive) | ||
1234 | { | ||
1235 | GNUNET_break_op(0); | ||
1236 | return GNUNET_SYSERR; | ||
1237 | } | ||
1238 | } | ||
1239 | else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && | ||
1240 | (op->state->phase != PHASE_EXPECT_IBF)) | ||
1241 | { | ||
1242 | GNUNET_break_op(0); | ||
1240 | return GNUNET_SYSERR; | 1243 | return GNUNET_SYSERR; |
1241 | } | 1244 | } |
1242 | } | ||
1243 | else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && | ||
1244 | (op->state->phase != PHASE_EXPECT_IBF) ) | ||
1245 | { | ||
1246 | GNUNET_break_op (0); | ||
1247 | return GNUNET_SYSERR; | ||
1248 | } | ||
1249 | 1245 | ||
1250 | return GNUNET_OK; | 1246 | return GNUNET_OK; |
1251 | } | 1247 | } |
@@ -1261,71 +1257,71 @@ check_union_p2p_ibf (void *cls, | |||
1261 | * @param msg the header of the message | 1257 | * @param msg the header of the message |
1262 | */ | 1258 | */ |
1263 | void | 1259 | void |
1264 | handle_union_p2p_ibf (void *cls, | 1260 | handle_union_p2p_ibf(void *cls, |
1265 | const struct IBFMessage *msg) | 1261 | const struct IBFMessage *msg) |
1266 | { | 1262 | { |
1267 | struct Operation *op = cls; | 1263 | struct Operation *op = cls; |
1268 | unsigned int buckets_in_message; | 1264 | unsigned int buckets_in_message; |
1269 | 1265 | ||
1270 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | 1266 | buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; |
1271 | if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || | 1267 | if ((op->state->phase == PHASE_INVENTORY_PASSIVE) || |
1272 | (op->state->phase == PHASE_EXPECT_IBF) ) | 1268 | (op->state->phase == PHASE_EXPECT_IBF)) |
1273 | { | ||
1274 | op->state->phase = PHASE_EXPECT_IBF_CONT; | ||
1275 | GNUNET_assert (NULL == op->state->remote_ibf); | ||
1276 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1277 | "Creating new ibf of size %u\n", | ||
1278 | 1 << msg->order); | ||
1279 | op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | ||
1280 | op->state->salt_receive = ntohl (msg->salt); | ||
1281 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1282 | "Receiving new IBF with salt %u\n", | ||
1283 | op->state->salt_receive); | ||
1284 | if (NULL == op->state->remote_ibf) | ||
1285 | { | ||
1286 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1287 | "Failed to parse remote IBF, closing connection\n"); | ||
1288 | fail_union_operation (op); | ||
1289 | return; | ||
1290 | } | ||
1291 | op->state->ibf_buckets_received = 0; | ||
1292 | if (0 != ntohl (msg->offset)) | ||
1293 | { | 1269 | { |
1294 | GNUNET_break_op (0); | 1270 | op->state->phase = PHASE_EXPECT_IBF_CONT; |
1295 | fail_union_operation (op); | 1271 | GNUNET_assert(NULL == op->state->remote_ibf); |
1296 | return; | 1272 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1273 | "Creating new ibf of size %u\n", | ||
1274 | 1 << msg->order); | ||
1275 | op->state->remote_ibf = ibf_create(1 << msg->order, SE_IBF_HASH_NUM); | ||
1276 | op->state->salt_receive = ntohl(msg->salt); | ||
1277 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1278 | "Receiving new IBF with salt %u\n", | ||
1279 | op->state->salt_receive); | ||
1280 | if (NULL == op->state->remote_ibf) | ||
1281 | { | ||
1282 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | ||
1283 | "Failed to parse remote IBF, closing connection\n"); | ||
1284 | fail_union_operation(op); | ||
1285 | return; | ||
1286 | } | ||
1287 | op->state->ibf_buckets_received = 0; | ||
1288 | if (0 != ntohl(msg->offset)) | ||
1289 | { | ||
1290 | GNUNET_break_op(0); | ||
1291 | fail_union_operation(op); | ||
1292 | return; | ||
1293 | } | ||
1297 | } | 1294 | } |
1298 | } | ||
1299 | else | 1295 | else |
1300 | { | 1296 | { |
1301 | GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); | 1297 | GNUNET_assert(op->state->phase == PHASE_EXPECT_IBF_CONT); |
1302 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1298 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1303 | "Received more of IBF\n"); | 1299 | "Received more of IBF\n"); |
1304 | } | 1300 | } |
1305 | GNUNET_assert (NULL != op->state->remote_ibf); | 1301 | GNUNET_assert(NULL != op->state->remote_ibf); |
1306 | 1302 | ||
1307 | ibf_read_slice (&msg[1], | 1303 | ibf_read_slice(&msg[1], |
1308 | op->state->ibf_buckets_received, | 1304 | op->state->ibf_buckets_received, |
1309 | buckets_in_message, | 1305 | buckets_in_message, |
1310 | op->state->remote_ibf); | 1306 | op->state->remote_ibf); |
1311 | op->state->ibf_buckets_received += buckets_in_message; | 1307 | op->state->ibf_buckets_received += buckets_in_message; |
1312 | 1308 | ||
1313 | if (op->state->ibf_buckets_received == op->state->remote_ibf->size) | 1309 | if (op->state->ibf_buckets_received == op->state->remote_ibf->size) |
1314 | { | 1310 | { |
1315 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1311 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1316 | "received full ibf\n"); | 1312 | "received full ibf\n"); |
1317 | op->state->phase = PHASE_INVENTORY_ACTIVE; | 1313 | op->state->phase = PHASE_INVENTORY_ACTIVE; |
1318 | if (GNUNET_OK != | 1314 | if (GNUNET_OK != |
1319 | decode_and_send (op)) | 1315 | decode_and_send(op)) |
1320 | { | 1316 | { |
1321 | /* Internal error, best we can do is shut down */ | 1317 | /* Internal error, best we can do is shut down */ |
1322 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1318 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, |
1323 | "Failed to decode IBF, closing connection\n"); | 1319 | "Failed to decode IBF, closing connection\n"); |
1324 | fail_union_operation (op); | 1320 | fail_union_operation(op); |
1325 | return; | 1321 | return; |
1322 | } | ||
1326 | } | 1323 | } |
1327 | } | 1324 | GNUNET_CADET_receive_done(op->channel); |
1328 | GNUNET_CADET_receive_done (op->channel); | ||
1329 | } | 1325 | } |
1330 | 1326 | ||
1331 | 1327 | ||
@@ -1338,33 +1334,33 @@ handle_union_p2p_ibf (void *cls, | |||
1338 | * @param status status to send with the new element | 1334 | * @param status status to send with the new element |
1339 | */ | 1335 | */ |
1340 | static void | 1336 | static void |
1341 | send_client_element (struct Operation *op, | 1337 | send_client_element(struct Operation *op, |
1342 | struct GNUNET_SET_Element *element, | 1338 | struct GNUNET_SET_Element *element, |
1343 | int status) | 1339 | int status) |
1344 | { | 1340 | { |
1345 | struct GNUNET_MQ_Envelope *ev; | 1341 | struct GNUNET_MQ_Envelope *ev; |
1346 | struct GNUNET_SET_ResultMessage *rm; | 1342 | struct GNUNET_SET_ResultMessage *rm; |
1347 | 1343 | ||
1348 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1344 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1349 | "sending element (size %u) to client\n", | 1345 | "sending element (size %u) to client\n", |
1350 | element->size); | 1346 | element->size); |
1351 | GNUNET_assert (0 != op->client_request_id); | 1347 | GNUNET_assert(0 != op->client_request_id); |
1352 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | 1348 | ev = GNUNET_MQ_msg_extra(rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
1353 | if (NULL == ev) | 1349 | if (NULL == ev) |
1354 | { | 1350 | { |
1355 | GNUNET_MQ_discard (ev); | 1351 | GNUNET_MQ_discard(ev); |
1356 | GNUNET_break (0); | 1352 | GNUNET_break(0); |
1357 | return; | 1353 | return; |
1358 | } | 1354 | } |
1359 | rm->result_status = htons (status); | 1355 | rm->result_status = htons(status); |
1360 | rm->request_id = htonl (op->client_request_id); | 1356 | rm->request_id = htonl(op->client_request_id); |
1361 | rm->element_type = htons (element->element_type); | 1357 | rm->element_type = htons(element->element_type); |
1362 | rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); | 1358 | rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element)); |
1363 | GNUNET_memcpy (&rm[1], | 1359 | GNUNET_memcpy(&rm[1], |
1364 | element->data, | 1360 | element->data, |
1365 | element->size); | 1361 | element->size); |
1366 | GNUNET_MQ_send (op->set->cs->mq, | 1362 | GNUNET_MQ_send(op->set->cs->mq, |
1367 | ev); | 1363 | ev); |
1368 | } | 1364 | } |
1369 | 1365 | ||
1370 | 1366 | ||
@@ -1375,49 +1371,50 @@ send_client_element (struct Operation *op, | |||
1375 | * @param cls operation to destroy | 1371 | * @param cls operation to destroy |
1376 | */ | 1372 | */ |
1377 | static void | 1373 | static void |
1378 | send_client_done (void *cls) | 1374 | send_client_done(void *cls) |
1379 | { | 1375 | { |
1380 | struct Operation *op = cls; | 1376 | struct Operation *op = cls; |
1381 | struct GNUNET_MQ_Envelope *ev; | 1377 | struct GNUNET_MQ_Envelope *ev; |
1382 | struct GNUNET_SET_ResultMessage *rm; | 1378 | struct GNUNET_SET_ResultMessage *rm; |
1383 | 1379 | ||
1384 | if (GNUNET_YES == op->state->client_done_sent) | 1380 | if (GNUNET_YES == op->state->client_done_sent) |
1385 | { | 1381 | { |
1386 | return; | 1382 | return; |
1387 | } | 1383 | } |
1388 | 1384 | ||
1389 | if (PHASE_DONE != op->state->phase) { | 1385 | if (PHASE_DONE != op->state->phase) |
1390 | LOG (GNUNET_ERROR_TYPE_WARNING, | 1386 | { |
1391 | "Union operation failed\n"); | 1387 | LOG(GNUNET_ERROR_TYPE_WARNING, |
1392 | GNUNET_STATISTICS_update (_GSS_statistics, | 1388 | "Union operation failed\n"); |
1393 | "# Union operations failed", | 1389 | GNUNET_STATISTICS_update(_GSS_statistics, |
1394 | 1, | 1390 | "# Union operations failed", |
1395 | GNUNET_NO); | 1391 | 1, |
1396 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 1392 | GNUNET_NO); |
1397 | rm->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 1393 | ev = GNUNET_MQ_msg(rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
1398 | rm->request_id = htonl (op->client_request_id); | 1394 | rm->result_status = htons(GNUNET_SET_STATUS_FAILURE); |
1399 | rm->element_type = htons (0); | 1395 | rm->request_id = htonl(op->client_request_id); |
1400 | GNUNET_MQ_send (op->set->cs->mq, | 1396 | rm->element_type = htons(0); |
1401 | ev); | 1397 | GNUNET_MQ_send(op->set->cs->mq, |
1402 | return; | 1398 | ev); |
1403 | } | 1399 | return; |
1400 | } | ||
1404 | 1401 | ||
1405 | op->state->client_done_sent = GNUNET_YES; | 1402 | op->state->client_done_sent = GNUNET_YES; |
1406 | 1403 | ||
1407 | GNUNET_STATISTICS_update (_GSS_statistics, | 1404 | GNUNET_STATISTICS_update(_GSS_statistics, |
1408 | "# Union operations succeeded", | 1405 | "# Union operations succeeded", |
1409 | 1, | 1406 | 1, |
1410 | GNUNET_NO); | 1407 | GNUNET_NO); |
1411 | LOG (GNUNET_ERROR_TYPE_INFO, | 1408 | LOG(GNUNET_ERROR_TYPE_INFO, |
1412 | "Signalling client that union operation is done\n"); | 1409 | "Signalling client that union operation is done\n"); |
1413 | ev = GNUNET_MQ_msg (rm, | 1410 | ev = GNUNET_MQ_msg(rm, |
1414 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 1411 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
1415 | rm->request_id = htonl (op->client_request_id); | 1412 | rm->request_id = htonl(op->client_request_id); |
1416 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 1413 | rm->result_status = htons(GNUNET_SET_STATUS_DONE); |
1417 | rm->element_type = htons (0); | 1414 | rm->element_type = htons(0); |
1418 | rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); | 1415 | rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element)); |
1419 | GNUNET_MQ_send (op->set->cs->mq, | 1416 | GNUNET_MQ_send(op->set->cs->mq, |
1420 | ev); | 1417 | ev); |
1421 | } | 1418 | } |
1422 | 1419 | ||
1423 | 1420 | ||
@@ -1427,41 +1424,41 @@ send_client_done (void *cls) | |||
1427 | * @param op operation to check | 1424 | * @param op operation to check |
1428 | */ | 1425 | */ |
1429 | static void | 1426 | static void |
1430 | maybe_finish (struct Operation *op) | 1427 | maybe_finish(struct Operation *op) |
1431 | { | 1428 | { |
1432 | unsigned int num_demanded; | 1429 | unsigned int num_demanded; |
1433 | 1430 | ||
1434 | num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes); | 1431 | num_demanded = GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes); |
1435 | 1432 | ||
1436 | if (PHASE_FINISH_WAITING == op->state->phase) | 1433 | if (PHASE_FINISH_WAITING == op->state->phase) |
1437 | { | ||
1438 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1439 | "In PHASE_FINISH_WAITING, pending %u demands\n", | ||
1440 | num_demanded); | ||
1441 | if (0 == num_demanded) | ||
1442 | { | 1434 | { |
1443 | struct GNUNET_MQ_Envelope *ev; | 1435 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1444 | 1436 | "In PHASE_FINISH_WAITING, pending %u demands\n", | |
1445 | op->state->phase = PHASE_DONE; | 1437 | num_demanded); |
1446 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | 1438 | if (0 == num_demanded) |
1447 | GNUNET_MQ_send (op->mq, | 1439 | { |
1448 | ev); | 1440 | struct GNUNET_MQ_Envelope *ev; |
1449 | /* We now wait until the other peer sends P2P_OVER | 1441 | |
1450 | * after it got all elements from us. */ | 1442 | op->state->phase = PHASE_DONE; |
1443 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | ||
1444 | GNUNET_MQ_send(op->mq, | ||
1445 | ev); | ||
1446 | /* We now wait until the other peer sends P2P_OVER | ||
1447 | * after it got all elements from us. */ | ||
1448 | } | ||
1451 | } | 1449 | } |
1452 | } | ||
1453 | if (PHASE_FINISH_CLOSING == op->state->phase) | 1450 | if (PHASE_FINISH_CLOSING == op->state->phase) |
1454 | { | ||
1455 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1456 | "In PHASE_FINISH_CLOSING, pending %u demands\n", | ||
1457 | num_demanded); | ||
1458 | if (0 == num_demanded) | ||
1459 | { | 1451 | { |
1460 | op->state->phase = PHASE_DONE; | 1452 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1461 | send_client_done (op); | 1453 | "In PHASE_FINISH_CLOSING, pending %u demands\n", |
1462 | _GSS_operation_destroy2 (op); | 1454 | num_demanded); |
1455 | if (0 == num_demanded) | ||
1456 | { | ||
1457 | op->state->phase = PHASE_DONE; | ||
1458 | send_client_done(op); | ||
1459 | _GSS_operation_destroy2(op); | ||
1460 | } | ||
1463 | } | 1461 | } |
1464 | } | ||
1465 | } | 1462 | } |
1466 | 1463 | ||
1467 | 1464 | ||
@@ -1472,21 +1469,21 @@ maybe_finish (struct Operation *op) | |||
1472 | * @param emsg the message | 1469 | * @param emsg the message |
1473 | */ | 1470 | */ |
1474 | int | 1471 | int |
1475 | check_union_p2p_elements (void *cls, | 1472 | check_union_p2p_elements(void *cls, |
1476 | const struct GNUNET_SET_ElementMessage *emsg) | 1473 | const struct GNUNET_SET_ElementMessage *emsg) |
1477 | { | 1474 | { |
1478 | struct Operation *op = cls; | 1475 | struct Operation *op = cls; |
1479 | 1476 | ||
1480 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1477 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1481 | { | 1478 | { |
1482 | GNUNET_break_op (0); | 1479 | GNUNET_break_op(0); |
1483 | return GNUNET_SYSERR; | 1480 | return GNUNET_SYSERR; |
1484 | } | 1481 | } |
1485 | if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) | 1482 | if (0 == GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes)) |
1486 | { | 1483 | { |
1487 | GNUNET_break_op (0); | 1484 | GNUNET_break_op(0); |
1488 | return GNUNET_SYSERR; | 1485 | return GNUNET_SYSERR; |
1489 | } | 1486 | } |
1490 | return GNUNET_OK; | 1487 | return GNUNET_OK; |
1491 | } | 1488 | } |
1492 | 1489 | ||
@@ -1500,96 +1497,98 @@ check_union_p2p_elements (void *cls, | |||
1500 | * @param emsg the message | 1497 | * @param emsg the message |
1501 | */ | 1498 | */ |
1502 | void | 1499 | void |
1503 | handle_union_p2p_elements (void *cls, | 1500 | handle_union_p2p_elements(void *cls, |
1504 | const struct GNUNET_SET_ElementMessage *emsg) | 1501 | const struct GNUNET_SET_ElementMessage *emsg) |
1505 | { | 1502 | { |
1506 | struct Operation *op = cls; | 1503 | struct Operation *op = cls; |
1507 | struct ElementEntry *ee; | 1504 | struct ElementEntry *ee; |
1508 | struct KeyEntry *ke; | 1505 | struct KeyEntry *ke; |
1509 | uint16_t element_size; | 1506 | uint16_t element_size; |
1510 | 1507 | ||
1511 | element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage); | 1508 | element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage); |
1512 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); | 1509 | ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size); |
1513 | GNUNET_memcpy (&ee[1], | 1510 | GNUNET_memcpy(&ee[1], |
1514 | &emsg[1], | 1511 | &emsg[1], |
1515 | element_size); | 1512 | element_size); |
1516 | ee->element.size = element_size; | 1513 | ee->element.size = element_size; |
1517 | ee->element.data = &ee[1]; | 1514 | ee->element.data = &ee[1]; |
1518 | ee->element.element_type = ntohs (emsg->element_type); | 1515 | ee->element.element_type = ntohs(emsg->element_type); |
1519 | ee->remote = GNUNET_YES; | 1516 | ee->remote = GNUNET_YES; |
1520 | GNUNET_SET_element_hash (&ee->element, | 1517 | GNUNET_SET_element_hash(&ee->element, |
1521 | &ee->element_hash); | 1518 | &ee->element_hash); |
1522 | if (GNUNET_NO == | 1519 | if (GNUNET_NO == |
1523 | GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, | 1520 | GNUNET_CONTAINER_multihashmap_remove(op->state->demanded_hashes, |
1524 | &ee->element_hash, | 1521 | &ee->element_hash, |
1525 | NULL)) | 1522 | NULL)) |
1526 | { | 1523 | { |
1527 | /* We got something we didn't demand, since it's not in our map. */ | 1524 | /* We got something we didn't demand, since it's not in our map. */ |
1528 | GNUNET_break_op (0); | 1525 | GNUNET_break_op(0); |
1529 | fail_union_operation (op); | 1526 | fail_union_operation(op); |
1530 | return; | 1527 | return; |
1531 | } | 1528 | } |
1532 | 1529 | ||
1533 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1530 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1534 | "Got element (size %u, hash %s) from peer\n", | 1531 | "Got element (size %u, hash %s) from peer\n", |
1535 | (unsigned int) element_size, | 1532 | (unsigned int)element_size, |
1536 | GNUNET_h2s (&ee->element_hash)); | 1533 | GNUNET_h2s(&ee->element_hash)); |
1537 | 1534 | ||
1538 | GNUNET_STATISTICS_update (_GSS_statistics, | 1535 | GNUNET_STATISTICS_update(_GSS_statistics, |
1539 | "# received elements", | 1536 | "# received elements", |
1540 | 1, | 1537 | 1, |
1541 | GNUNET_NO); | 1538 | GNUNET_NO); |
1542 | GNUNET_STATISTICS_update (_GSS_statistics, | 1539 | GNUNET_STATISTICS_update(_GSS_statistics, |
1543 | "# exchanged elements", | 1540 | "# exchanged elements", |
1544 | 1, | 1541 | 1, |
1545 | GNUNET_NO); | 1542 | GNUNET_NO); |
1546 | 1543 | ||
1547 | op->state->received_total++; | 1544 | op->state->received_total++; |
1548 | 1545 | ||
1549 | ke = op_get_element (op, &ee->element_hash); | 1546 | ke = op_get_element(op, &ee->element_hash); |
1550 | if (NULL != ke) | 1547 | if (NULL != ke) |
1551 | { | 1548 | { |
1552 | /* Got repeated element. Should not happen since | 1549 | /* Got repeated element. Should not happen since |
1553 | * we track demands. */ | 1550 | * we track demands. */ |
1554 | GNUNET_STATISTICS_update (_GSS_statistics, | 1551 | GNUNET_STATISTICS_update(_GSS_statistics, |
1555 | "# repeated elements", | 1552 | "# repeated elements", |
1556 | 1, | 1553 | 1, |
1557 | GNUNET_NO); | 1554 | GNUNET_NO); |
1558 | ke->received = GNUNET_YES; | 1555 | ke->received = GNUNET_YES; |
1559 | GNUNET_free (ee); | 1556 | GNUNET_free(ee); |
1560 | } | 1557 | } |
1561 | else | 1558 | else |
1562 | { | 1559 | { |
1563 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1560 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1564 | "Registering new element from remote peer\n"); | 1561 | "Registering new element from remote peer\n"); |
1565 | op->state->received_fresh++; | 1562 | op->state->received_fresh++; |
1566 | op_register_element (op, ee, GNUNET_YES); | 1563 | op_register_element(op, ee, GNUNET_YES); |
1567 | /* only send results immediately if the client wants it */ | 1564 | /* only send results immediately if the client wants it */ |
1568 | switch (op->result_mode) | 1565 | switch (op->result_mode) |
1569 | { | 1566 | { |
1570 | case GNUNET_SET_RESULT_ADDED: | 1567 | case GNUNET_SET_RESULT_ADDED: |
1571 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); | 1568 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK); |
1572 | break; | 1569 | break; |
1573 | case GNUNET_SET_RESULT_SYMMETRIC: | 1570 | |
1574 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); | 1571 | case GNUNET_SET_RESULT_SYMMETRIC: |
1575 | break; | 1572 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); |
1576 | default: | 1573 | break; |
1577 | /* Result mode not supported, should have been caught earlier. */ | 1574 | |
1578 | GNUNET_break (0); | 1575 | default: |
1579 | break; | 1576 | /* Result mode not supported, should have been caught earlier. */ |
1577 | GNUNET_break(0); | ||
1578 | break; | ||
1579 | } | ||
1580 | } | 1580 | } |
1581 | } | ||
1582 | 1581 | ||
1583 | if ( (op->state->received_total > 8) && | 1582 | if ((op->state->received_total > 8) && |
1584 | (op->state->received_fresh < op->state->received_total / 3) ) | 1583 | (op->state->received_fresh < op->state->received_total / 3)) |
1585 | { | 1584 | { |
1586 | /* The other peer gave us lots of old elements, there's something wrong. */ | 1585 | /* The other peer gave us lots of old elements, there's something wrong. */ |
1587 | GNUNET_break_op (0); | 1586 | GNUNET_break_op(0); |
1588 | fail_union_operation (op); | 1587 | fail_union_operation(op); |
1589 | return; | 1588 | return; |
1590 | } | 1589 | } |
1591 | GNUNET_CADET_receive_done (op->channel); | 1590 | GNUNET_CADET_receive_done(op->channel); |
1592 | maybe_finish (op); | 1591 | maybe_finish(op); |
1593 | } | 1592 | } |
1594 | 1593 | ||
1595 | 1594 | ||
@@ -1600,16 +1599,16 @@ handle_union_p2p_elements (void *cls, | |||
1600 | * @param emsg the message | 1599 | * @param emsg the message |
1601 | */ | 1600 | */ |
1602 | int | 1601 | int |
1603 | check_union_p2p_full_element (void *cls, | 1602 | check_union_p2p_full_element(void *cls, |
1604 | const struct GNUNET_SET_ElementMessage *emsg) | 1603 | const struct GNUNET_SET_ElementMessage *emsg) |
1605 | { | 1604 | { |
1606 | struct Operation *op = cls; | 1605 | struct Operation *op = cls; |
1607 | 1606 | ||
1608 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1607 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1609 | { | 1608 | { |
1610 | GNUNET_break_op (0); | 1609 | GNUNET_break_op(0); |
1611 | return GNUNET_SYSERR; | 1610 | return GNUNET_SYSERR; |
1612 | } | 1611 | } |
1613 | // FIXME: check that we expect full elements here? | 1612 | // FIXME: check that we expect full elements here? |
1614 | return GNUNET_OK; | 1613 | return GNUNET_OK; |
1615 | } | 1614 | } |
@@ -1622,87 +1621,89 @@ check_union_p2p_full_element (void *cls, | |||
1622 | * @param emsg the message | 1621 | * @param emsg the message |
1623 | */ | 1622 | */ |
1624 | void | 1623 | void |
1625 | handle_union_p2p_full_element (void *cls, | 1624 | handle_union_p2p_full_element(void *cls, |
1626 | const struct GNUNET_SET_ElementMessage *emsg) | 1625 | const struct GNUNET_SET_ElementMessage *emsg) |
1627 | { | 1626 | { |
1628 | struct Operation *op = cls; | 1627 | struct Operation *op = cls; |
1629 | struct ElementEntry *ee; | 1628 | struct ElementEntry *ee; |
1630 | struct KeyEntry *ke; | 1629 | struct KeyEntry *ke; |
1631 | uint16_t element_size; | 1630 | uint16_t element_size; |
1632 | 1631 | ||
1633 | element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage); | 1632 | element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage); |
1634 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); | 1633 | ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size); |
1635 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | 1634 | GNUNET_memcpy(&ee[1], &emsg[1], element_size); |
1636 | ee->element.size = element_size; | 1635 | ee->element.size = element_size; |
1637 | ee->element.data = &ee[1]; | 1636 | ee->element.data = &ee[1]; |
1638 | ee->element.element_type = ntohs (emsg->element_type); | 1637 | ee->element.element_type = ntohs(emsg->element_type); |
1639 | ee->remote = GNUNET_YES; | 1638 | ee->remote = GNUNET_YES; |
1640 | GNUNET_SET_element_hash (&ee->element, &ee->element_hash); | 1639 | GNUNET_SET_element_hash(&ee->element, &ee->element_hash); |
1641 | 1640 | ||
1642 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1641 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1643 | "Got element (full diff, size %u, hash %s) from peer\n", | 1642 | "Got element (full diff, size %u, hash %s) from peer\n", |
1644 | (unsigned int) element_size, | 1643 | (unsigned int)element_size, |
1645 | GNUNET_h2s (&ee->element_hash)); | 1644 | GNUNET_h2s(&ee->element_hash)); |
1646 | 1645 | ||
1647 | GNUNET_STATISTICS_update (_GSS_statistics, | 1646 | GNUNET_STATISTICS_update(_GSS_statistics, |
1648 | "# received elements", | 1647 | "# received elements", |
1649 | 1, | 1648 | 1, |
1650 | GNUNET_NO); | 1649 | GNUNET_NO); |
1651 | GNUNET_STATISTICS_update (_GSS_statistics, | 1650 | GNUNET_STATISTICS_update(_GSS_statistics, |
1652 | "# exchanged elements", | 1651 | "# exchanged elements", |
1653 | 1, | 1652 | 1, |
1654 | GNUNET_NO); | 1653 | GNUNET_NO); |
1655 | 1654 | ||
1656 | op->state->received_total++; | 1655 | op->state->received_total++; |
1657 | 1656 | ||
1658 | ke = op_get_element (op, &ee->element_hash); | 1657 | ke = op_get_element(op, &ee->element_hash); |
1659 | if (NULL != ke) | 1658 | if (NULL != ke) |
1660 | { | 1659 | { |
1661 | /* Got repeated element. Should not happen since | 1660 | /* Got repeated element. Should not happen since |
1662 | * we track demands. */ | 1661 | * we track demands. */ |
1663 | GNUNET_STATISTICS_update (_GSS_statistics, | 1662 | GNUNET_STATISTICS_update(_GSS_statistics, |
1664 | "# repeated elements", | 1663 | "# repeated elements", |
1665 | 1, | 1664 | 1, |
1666 | GNUNET_NO); | 1665 | GNUNET_NO); |
1667 | ke->received = GNUNET_YES; | 1666 | ke->received = GNUNET_YES; |
1668 | GNUNET_free (ee); | 1667 | GNUNET_free(ee); |
1669 | } | 1668 | } |
1670 | else | 1669 | else |
1671 | { | 1670 | { |
1672 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1671 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1673 | "Registering new element from remote peer\n"); | 1672 | "Registering new element from remote peer\n"); |
1674 | op->state->received_fresh++; | 1673 | op->state->received_fresh++; |
1675 | op_register_element (op, ee, GNUNET_YES); | 1674 | op_register_element(op, ee, GNUNET_YES); |
1676 | /* only send results immediately if the client wants it */ | 1675 | /* only send results immediately if the client wants it */ |
1677 | switch (op->result_mode) | 1676 | switch (op->result_mode) |
1678 | { | 1677 | { |
1679 | case GNUNET_SET_RESULT_ADDED: | 1678 | case GNUNET_SET_RESULT_ADDED: |
1680 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); | 1679 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK); |
1681 | break; | 1680 | break; |
1682 | case GNUNET_SET_RESULT_SYMMETRIC: | 1681 | |
1683 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); | 1682 | case GNUNET_SET_RESULT_SYMMETRIC: |
1684 | break; | 1683 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); |
1685 | default: | 1684 | break; |
1686 | /* Result mode not supported, should have been caught earlier. */ | 1685 | |
1687 | GNUNET_break (0); | 1686 | default: |
1688 | break; | 1687 | /* Result mode not supported, should have been caught earlier. */ |
1688 | GNUNET_break(0); | ||
1689 | break; | ||
1690 | } | ||
1689 | } | 1691 | } |
1690 | } | ||
1691 | 1692 | ||
1692 | if ( (GNUNET_YES == op->byzantine) && | 1693 | if ((GNUNET_YES == op->byzantine) && |
1693 | (op->state->received_total > 384 + op->state->received_fresh * 4) && | 1694 | (op->state->received_total > 384 + op->state->received_fresh * 4) && |
1694 | (op->state->received_fresh < op->state->received_total / 6) ) | 1695 | (op->state->received_fresh < op->state->received_total / 6)) |
1695 | { | 1696 | { |
1696 | /* The other peer gave us lots of old elements, there's something wrong. */ | 1697 | /* The other peer gave us lots of old elements, there's something wrong. */ |
1697 | LOG (GNUNET_ERROR_TYPE_ERROR, | 1698 | LOG(GNUNET_ERROR_TYPE_ERROR, |
1698 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", | 1699 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", |
1699 | (unsigned long long) op->state->received_fresh, | 1700 | (unsigned long long)op->state->received_fresh, |
1700 | (unsigned long long) op->state->received_total); | 1701 | (unsigned long long)op->state->received_total); |
1701 | GNUNET_break_op (0); | 1702 | GNUNET_break_op(0); |
1702 | fail_union_operation (op); | 1703 | fail_union_operation(op); |
1703 | return; | 1704 | return; |
1704 | } | 1705 | } |
1705 | GNUNET_CADET_receive_done (op->channel); | 1706 | GNUNET_CADET_receive_done(op->channel); |
1706 | } | 1707 | } |
1707 | 1708 | ||
1708 | 1709 | ||
@@ -1714,30 +1715,30 @@ handle_union_p2p_full_element (void *cls, | |||
1714 | * @param msg the message | 1715 | * @param msg the message |
1715 | */ | 1716 | */ |
1716 | int | 1717 | int |
1717 | check_union_p2p_inquiry (void *cls, | 1718 | check_union_p2p_inquiry(void *cls, |
1718 | const struct InquiryMessage *msg) | 1719 | const struct InquiryMessage *msg) |
1719 | { | 1720 | { |
1720 | struct Operation *op = cls; | 1721 | struct Operation *op = cls; |
1721 | unsigned int num_keys; | 1722 | unsigned int num_keys; |
1722 | 1723 | ||
1723 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1724 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1724 | { | 1725 | { |
1725 | GNUNET_break_op (0); | 1726 | GNUNET_break_op(0); |
1726 | return GNUNET_SYSERR; | 1727 | return GNUNET_SYSERR; |
1727 | } | 1728 | } |
1728 | if (op->state->phase != PHASE_INVENTORY_PASSIVE) | 1729 | if (op->state->phase != PHASE_INVENTORY_PASSIVE) |
1729 | { | 1730 | { |
1730 | GNUNET_break_op (0); | 1731 | GNUNET_break_op(0); |
1731 | return GNUNET_SYSERR; | 1732 | return GNUNET_SYSERR; |
1732 | } | 1733 | } |
1733 | num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) | 1734 | num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage)) |
1734 | / sizeof (struct IBF_Key); | 1735 | / sizeof(struct IBF_Key); |
1735 | if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage)) | 1736 | if ((ntohs(msg->header.size) - sizeof(struct InquiryMessage)) |
1736 | != num_keys * sizeof (struct IBF_Key)) | 1737 | != num_keys * sizeof(struct IBF_Key)) |
1737 | { | 1738 | { |
1738 | GNUNET_break_op (0); | 1739 | GNUNET_break_op(0); |
1739 | return GNUNET_SYSERR; | 1740 | return GNUNET_SYSERR; |
1740 | } | 1741 | } |
1741 | return GNUNET_OK; | 1742 | return GNUNET_OK; |
1742 | } | 1743 | } |
1743 | 1744 | ||
@@ -1750,30 +1751,30 @@ check_union_p2p_inquiry (void *cls, | |||
1750 | * @param msg the message | 1751 | * @param msg the message |
1751 | */ | 1752 | */ |
1752 | void | 1753 | void |
1753 | handle_union_p2p_inquiry (void *cls, | 1754 | handle_union_p2p_inquiry(void *cls, |
1754 | const struct InquiryMessage *msg) | 1755 | const struct InquiryMessage *msg) |
1755 | { | 1756 | { |
1756 | struct Operation *op = cls; | 1757 | struct Operation *op = cls; |
1757 | const struct IBF_Key *ibf_key; | 1758 | const struct IBF_Key *ibf_key; |
1758 | unsigned int num_keys; | 1759 | unsigned int num_keys; |
1759 | 1760 | ||
1760 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1761 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1761 | "Received union inquiry\n"); | 1762 | "Received union inquiry\n"); |
1762 | num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) | 1763 | num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage)) |
1763 | / sizeof (struct IBF_Key); | 1764 | / sizeof(struct IBF_Key); |
1764 | ibf_key = (const struct IBF_Key *) &msg[1]; | 1765 | ibf_key = (const struct IBF_Key *)&msg[1]; |
1765 | while (0 != num_keys--) | 1766 | while (0 != num_keys--) |
1766 | { | 1767 | { |
1767 | struct IBF_Key unsalted_key; | 1768 | struct IBF_Key unsalted_key; |
1768 | 1769 | ||
1769 | unsalt_key (ibf_key, | 1770 | unsalt_key(ibf_key, |
1770 | ntohl (msg->salt), | 1771 | ntohl(msg->salt), |
1771 | &unsalted_key); | 1772 | &unsalted_key); |
1772 | send_offers_for_key (op, | 1773 | send_offers_for_key(op, |
1773 | unsalted_key); | 1774 | unsalted_key); |
1774 | ibf_key++; | 1775 | ibf_key++; |
1775 | } | 1776 | } |
1776 | GNUNET_CADET_receive_done (op->channel); | 1777 | GNUNET_CADET_receive_done(op->channel); |
1777 | } | 1778 | } |
1778 | 1779 | ||
1779 | 1780 | ||
@@ -1788,9 +1789,9 @@ handle_union_p2p_inquiry (void *cls, | |||
1788 | * #GNUNET_NO if not. | 1789 | * #GNUNET_NO if not. |
1789 | */ | 1790 | */ |
1790 | static int | 1791 | static int |
1791 | send_missing_full_elements_iter (void *cls, | 1792 | send_missing_full_elements_iter(void *cls, |
1792 | uint32_t key, | 1793 | uint32_t key, |
1793 | void *value) | 1794 | void *value) |
1794 | { | 1795 | { |
1795 | struct Operation *op = cls; | 1796 | struct Operation *op = cls; |
1796 | struct KeyEntry *ke = value; | 1797 | struct KeyEntry *ke = value; |
@@ -1800,15 +1801,15 @@ send_missing_full_elements_iter (void *cls, | |||
1800 | 1801 | ||
1801 | if (GNUNET_YES == ke->received) | 1802 | if (GNUNET_YES == ke->received) |
1802 | return GNUNET_YES; | 1803 | return GNUNET_YES; |
1803 | ev = GNUNET_MQ_msg_extra (emsg, | 1804 | ev = GNUNET_MQ_msg_extra(emsg, |
1804 | ee->element.size, | 1805 | ee->element.size, |
1805 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | 1806 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); |
1806 | GNUNET_memcpy (&emsg[1], | 1807 | GNUNET_memcpy(&emsg[1], |
1807 | ee->element.data, | 1808 | ee->element.data, |
1808 | ee->element.size); | 1809 | ee->element.size); |
1809 | emsg->element_type = htons (ee->element.element_type); | 1810 | emsg->element_type = htons(ee->element.element_type); |
1810 | GNUNET_MQ_send (op->mq, | 1811 | GNUNET_MQ_send(op->mq, |
1811 | ev); | 1812 | ev); |
1812 | return GNUNET_YES; | 1813 | return GNUNET_YES; |
1813 | } | 1814 | } |
1814 | 1815 | ||
@@ -1820,30 +1821,30 @@ send_missing_full_elements_iter (void *cls, | |||
1820 | * @param mh the demand message | 1821 | * @param mh the demand message |
1821 | */ | 1822 | */ |
1822 | void | 1823 | void |
1823 | handle_union_p2p_request_full (void *cls, | 1824 | handle_union_p2p_request_full(void *cls, |
1824 | const struct GNUNET_MessageHeader *mh) | 1825 | const struct GNUNET_MessageHeader *mh) |
1825 | { | 1826 | { |
1826 | struct Operation *op = cls; | 1827 | struct Operation *op = cls; |
1827 | 1828 | ||
1828 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1829 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1829 | "Received request for full set transmission\n"); | 1830 | "Received request for full set transmission\n"); |
1830 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1831 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1831 | { | 1832 | { |
1832 | GNUNET_break_op (0); | 1833 | GNUNET_break_op(0); |
1833 | fail_union_operation (op); | 1834 | fail_union_operation(op); |
1834 | return; | 1835 | return; |
1835 | } | 1836 | } |
1836 | if (PHASE_EXPECT_IBF != op->state->phase) | 1837 | if (PHASE_EXPECT_IBF != op->state->phase) |
1837 | { | 1838 | { |
1838 | GNUNET_break_op (0); | 1839 | GNUNET_break_op(0); |
1839 | fail_union_operation (op); | 1840 | fail_union_operation(op); |
1840 | return; | 1841 | return; |
1841 | } | 1842 | } |
1842 | 1843 | ||
1843 | // FIXME: we need to check that our set is larger than the | 1844 | // FIXME: we need to check that our set is larger than the |
1844 | // byzantine_lower_bound by some threshold | 1845 | // byzantine_lower_bound by some threshold |
1845 | send_full_set (op); | 1846 | send_full_set(op); |
1846 | GNUNET_CADET_receive_done (op->channel); | 1847 | GNUNET_CADET_receive_done(op->channel); |
1847 | } | 1848 | } |
1848 | 1849 | ||
1849 | 1850 | ||
@@ -1854,53 +1855,55 @@ handle_union_p2p_request_full (void *cls, | |||
1854 | * @param mh the demand message | 1855 | * @param mh the demand message |
1855 | */ | 1856 | */ |
1856 | void | 1857 | void |
1857 | handle_union_p2p_full_done (void *cls, | 1858 | handle_union_p2p_full_done(void *cls, |
1858 | const struct GNUNET_MessageHeader *mh) | 1859 | const struct GNUNET_MessageHeader *mh) |
1859 | { | 1860 | { |
1860 | struct Operation *op = cls; | 1861 | struct Operation *op = cls; |
1861 | 1862 | ||
1862 | switch (op->state->phase) | 1863 | switch (op->state->phase) |
1863 | { | 1864 | { |
1864 | case PHASE_EXPECT_IBF: | 1865 | case PHASE_EXPECT_IBF: |
1865 | { | 1866 | { |
1866 | struct GNUNET_MQ_Envelope *ev; | 1867 | struct GNUNET_MQ_Envelope *ev; |
1867 | 1868 | ||
1868 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1869 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1869 | "got FULL DONE, sending elements that other peer is missing\n"); | 1870 | "got FULL DONE, sending elements that other peer is missing\n"); |
1870 | 1871 | ||
1871 | /* send all the elements that did not come from the remote peer */ | 1872 | /* send all the elements that did not come from the remote peer */ |
1872 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | 1873 | GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, |
1873 | &send_missing_full_elements_iter, | 1874 | &send_missing_full_elements_iter, |
1874 | op); | 1875 | op); |
1875 | 1876 | ||
1876 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | 1877 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); |
1877 | GNUNET_MQ_send (op->mq, | 1878 | GNUNET_MQ_send(op->mq, |
1878 | ev); | 1879 | ev); |
1879 | op->state->phase = PHASE_DONE; | 1880 | op->state->phase = PHASE_DONE; |
1880 | /* we now wait until the other peer sends us the OVER message*/ | 1881 | /* we now wait until the other peer sends us the OVER message*/ |
1881 | } | 1882 | } |
1882 | break; | 1883 | break; |
1883 | case PHASE_FULL_SENDING: | 1884 | |
1885 | case PHASE_FULL_SENDING: | ||
1884 | { | 1886 | { |
1885 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1887 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1886 | "got FULL DONE, finishing\n"); | 1888 | "got FULL DONE, finishing\n"); |
1887 | /* We sent the full set, and got the response for that. We're done. */ | 1889 | /* We sent the full set, and got the response for that. We're done. */ |
1888 | op->state->phase = PHASE_DONE; | 1890 | op->state->phase = PHASE_DONE; |
1889 | GNUNET_CADET_receive_done (op->channel); | 1891 | GNUNET_CADET_receive_done(op->channel); |
1890 | send_client_done (op); | 1892 | send_client_done(op); |
1891 | _GSS_operation_destroy2 (op); | 1893 | _GSS_operation_destroy2(op); |
1892 | return; | 1894 | return; |
1893 | } | 1895 | } |
1894 | break; | 1896 | break; |
1895 | default: | 1897 | |
1896 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1898 | default: |
1897 | "Handle full done phase is %u\n", | 1899 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, |
1898 | (unsigned) op->state->phase); | 1900 | "Handle full done phase is %u\n", |
1899 | GNUNET_break_op (0); | 1901 | (unsigned)op->state->phase); |
1900 | fail_union_operation (op); | 1902 | GNUNET_break_op(0); |
1901 | return; | 1903 | fail_union_operation(op); |
1902 | } | 1904 | return; |
1903 | GNUNET_CADET_receive_done (op->channel); | 1905 | } |
1906 | GNUNET_CADET_receive_done(op->channel); | ||
1904 | } | 1907 | } |
1905 | 1908 | ||
1906 | 1909 | ||
@@ -1913,25 +1916,25 @@ handle_union_p2p_full_done (void *cls, | |||
1913 | * @return #GNUNET_OK if @a mh is well-formed | 1916 | * @return #GNUNET_OK if @a mh is well-formed |
1914 | */ | 1917 | */ |
1915 | int | 1918 | int |
1916 | check_union_p2p_demand (void *cls, | 1919 | check_union_p2p_demand(void *cls, |
1917 | const struct GNUNET_MessageHeader *mh) | 1920 | const struct GNUNET_MessageHeader *mh) |
1918 | { | 1921 | { |
1919 | struct Operation *op = cls; | 1922 | struct Operation *op = cls; |
1920 | unsigned int num_hashes; | 1923 | unsigned int num_hashes; |
1921 | 1924 | ||
1922 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1925 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1923 | { | 1926 | { |
1924 | GNUNET_break_op (0); | 1927 | GNUNET_break_op(0); |
1925 | return GNUNET_SYSERR; | 1928 | return GNUNET_SYSERR; |
1926 | } | 1929 | } |
1927 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 1930 | num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) |
1928 | / sizeof (struct GNUNET_HashCode); | 1931 | / sizeof(struct GNUNET_HashCode); |
1929 | if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 1932 | if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) |
1930 | != num_hashes * sizeof (struct GNUNET_HashCode)) | 1933 | != num_hashes * sizeof(struct GNUNET_HashCode)) |
1931 | { | 1934 | { |
1932 | GNUNET_break_op (0); | 1935 | GNUNET_break_op(0); |
1933 | return GNUNET_SYSERR; | 1936 | return GNUNET_SYSERR; |
1934 | } | 1937 | } |
1935 | return GNUNET_OK; | 1938 | return GNUNET_OK; |
1936 | } | 1939 | } |
1937 | 1940 | ||
@@ -1944,8 +1947,8 @@ check_union_p2p_demand (void *cls, | |||
1944 | * @param mh the demand message | 1947 | * @param mh the demand message |
1945 | */ | 1948 | */ |
1946 | void | 1949 | void |
1947 | handle_union_p2p_demand (void *cls, | 1950 | handle_union_p2p_demand(void *cls, |
1948 | const struct GNUNET_MessageHeader *mh) | 1951 | const struct GNUNET_MessageHeader *mh) |
1949 | { | 1952 | { |
1950 | struct Operation *op = cls; | 1953 | struct Operation *op = cls; |
1951 | struct ElementEntry *ee; | 1954 | struct ElementEntry *ee; |
@@ -1954,58 +1957,60 @@ handle_union_p2p_demand (void *cls, | |||
1954 | unsigned int num_hashes; | 1957 | unsigned int num_hashes; |
1955 | struct GNUNET_MQ_Envelope *ev; | 1958 | struct GNUNET_MQ_Envelope *ev; |
1956 | 1959 | ||
1957 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 1960 | num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) |
1958 | / sizeof (struct GNUNET_HashCode); | 1961 | / sizeof(struct GNUNET_HashCode); |
1959 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; | 1962 | for (hash = (const struct GNUNET_HashCode *)&mh[1]; |
1960 | num_hashes > 0; | 1963 | num_hashes > 0; |
1961 | hash++, num_hashes--) | 1964 | hash++, num_hashes--) |
1962 | { | ||
1963 | ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, | ||
1964 | hash); | ||
1965 | if (NULL == ee) | ||
1966 | { | 1965 | { |
1967 | /* Demand for non-existing element. */ | 1966 | ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements, |
1968 | GNUNET_break_op (0); | 1967 | hash); |
1969 | fail_union_operation (op); | 1968 | if (NULL == ee) |
1970 | return; | 1969 | { |
1971 | } | 1970 | /* Demand for non-existing element. */ |
1972 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) | 1971 | GNUNET_break_op(0); |
1973 | { | 1972 | fail_union_operation(op); |
1974 | /* Probably confused lazily copied sets. */ | 1973 | return; |
1975 | GNUNET_break_op (0); | 1974 | } |
1976 | fail_union_operation (op); | 1975 | if (GNUNET_NO == _GSS_is_element_of_operation(ee, op)) |
1977 | return; | 1976 | { |
1978 | } | 1977 | /* Probably confused lazily copied sets. */ |
1979 | ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | 1978 | GNUNET_break_op(0); |
1980 | GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); | 1979 | fail_union_operation(op); |
1981 | emsg->reserved = htons (0); | 1980 | return; |
1982 | emsg->element_type = htons (ee->element.element_type); | 1981 | } |
1983 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1982 | ev = GNUNET_MQ_msg_extra(emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); |
1984 | "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", | 1983 | GNUNET_memcpy(&emsg[1], ee->element.data, ee->element.size); |
1985 | (void *) op, | 1984 | emsg->reserved = htons(0); |
1986 | (unsigned int) ee->element.size, | 1985 | emsg->element_type = htons(ee->element.element_type); |
1987 | GNUNET_h2s (&ee->element_hash)); | 1986 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1988 | GNUNET_MQ_send (op->mq, ev); | 1987 | "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", |
1989 | GNUNET_STATISTICS_update (_GSS_statistics, | 1988 | (void *)op, |
1990 | "# exchanged elements", | 1989 | (unsigned int)ee->element.size, |
1991 | 1, | 1990 | GNUNET_h2s(&ee->element_hash)); |
1992 | GNUNET_NO); | 1991 | GNUNET_MQ_send(op->mq, ev); |
1993 | 1992 | GNUNET_STATISTICS_update(_GSS_statistics, | |
1994 | switch (op->result_mode) | 1993 | "# exchanged elements", |
1995 | { | 1994 | 1, |
1996 | case GNUNET_SET_RESULT_ADDED: | 1995 | GNUNET_NO); |
1997 | /* Nothing to do. */ | 1996 | |
1998 | break; | 1997 | switch (op->result_mode) |
1999 | case GNUNET_SET_RESULT_SYMMETRIC: | 1998 | { |
2000 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); | 1999 | case GNUNET_SET_RESULT_ADDED: |
2001 | break; | 2000 | /* Nothing to do. */ |
2002 | default: | 2001 | break; |
2003 | /* Result mode not supported, should have been caught earlier. */ | 2002 | |
2004 | GNUNET_break (0); | 2003 | case GNUNET_SET_RESULT_SYMMETRIC: |
2005 | break; | 2004 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); |
2005 | break; | ||
2006 | |||
2007 | default: | ||
2008 | /* Result mode not supported, should have been caught earlier. */ | ||
2009 | GNUNET_break(0); | ||
2010 | break; | ||
2011 | } | ||
2006 | } | 2012 | } |
2007 | } | 2013 | GNUNET_CADET_receive_done(op->channel); |
2008 | GNUNET_CADET_receive_done (op->channel); | ||
2009 | } | 2014 | } |
2010 | 2015 | ||
2011 | 2016 | ||
@@ -2017,32 +2022,32 @@ handle_union_p2p_demand (void *cls, | |||
2017 | * @return #GNUNET_OK if @a mh is well-formed | 2022 | * @return #GNUNET_OK if @a mh is well-formed |
2018 | */ | 2023 | */ |
2019 | int | 2024 | int |
2020 | check_union_p2p_offer (void *cls, | 2025 | check_union_p2p_offer(void *cls, |
2021 | const struct GNUNET_MessageHeader *mh) | 2026 | const struct GNUNET_MessageHeader *mh) |
2022 | { | 2027 | { |
2023 | struct Operation *op = cls; | 2028 | struct Operation *op = cls; |
2024 | unsigned int num_hashes; | 2029 | unsigned int num_hashes; |
2025 | 2030 | ||
2026 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 2031 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
2027 | { | 2032 | { |
2028 | GNUNET_break_op (0); | 2033 | GNUNET_break_op(0); |
2029 | return GNUNET_SYSERR; | 2034 | return GNUNET_SYSERR; |
2030 | } | 2035 | } |
2031 | /* look up elements and send them */ | 2036 | /* look up elements and send them */ |
2032 | if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && | 2037 | if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && |
2033 | (op->state->phase != PHASE_INVENTORY_ACTIVE)) | 2038 | (op->state->phase != PHASE_INVENTORY_ACTIVE)) |
2034 | { | 2039 | { |
2035 | GNUNET_break_op (0); | 2040 | GNUNET_break_op(0); |
2036 | return GNUNET_SYSERR; | 2041 | return GNUNET_SYSERR; |
2037 | } | 2042 | } |
2038 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 2043 | num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) |
2039 | / sizeof (struct GNUNET_HashCode); | 2044 | / sizeof(struct GNUNET_HashCode); |
2040 | if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) != | 2045 | if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) != |
2041 | num_hashes * sizeof (struct GNUNET_HashCode)) | 2046 | num_hashes * sizeof(struct GNUNET_HashCode)) |
2042 | { | 2047 | { |
2043 | GNUNET_break_op (0); | 2048 | GNUNET_break_op(0); |
2044 | return GNUNET_SYSERR; | 2049 | return GNUNET_SYSERR; |
2045 | } | 2050 | } |
2046 | return GNUNET_OK; | 2051 | return GNUNET_OK; |
2047 | } | 2052 | } |
2048 | 2053 | ||
@@ -2055,56 +2060,56 @@ check_union_p2p_offer (void *cls, | |||
2055 | * @param mh the message | 2060 | * @param mh the message |
2056 | */ | 2061 | */ |
2057 | void | 2062 | void |
2058 | handle_union_p2p_offer (void *cls, | 2063 | handle_union_p2p_offer(void *cls, |
2059 | const struct GNUNET_MessageHeader *mh) | 2064 | const struct GNUNET_MessageHeader *mh) |
2060 | { | 2065 | { |
2061 | struct Operation *op = cls; | 2066 | struct Operation *op = cls; |
2062 | const struct GNUNET_HashCode *hash; | 2067 | const struct GNUNET_HashCode *hash; |
2063 | unsigned int num_hashes; | 2068 | unsigned int num_hashes; |
2064 | 2069 | ||
2065 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 2070 | num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) |
2066 | / sizeof (struct GNUNET_HashCode); | 2071 | / sizeof(struct GNUNET_HashCode); |
2067 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; | 2072 | for (hash = (const struct GNUNET_HashCode *)&mh[1]; |
2068 | num_hashes > 0; | 2073 | num_hashes > 0; |
2069 | hash++, num_hashes--) | 2074 | hash++, num_hashes--) |
2070 | { | 2075 | { |
2071 | struct ElementEntry *ee; | 2076 | struct ElementEntry *ee; |
2072 | struct GNUNET_MessageHeader *demands; | 2077 | struct GNUNET_MessageHeader *demands; |
2073 | struct GNUNET_MQ_Envelope *ev; | 2078 | struct GNUNET_MQ_Envelope *ev; |
2074 | 2079 | ||
2075 | ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, | 2080 | ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements, |
2076 | hash); | 2081 | hash); |
2077 | if (NULL != ee) | 2082 | if (NULL != ee) |
2078 | if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) | 2083 | if (GNUNET_YES == _GSS_is_element_of_operation(ee, op)) |
2079 | continue; | 2084 | continue; |
2080 | 2085 | ||
2081 | if (GNUNET_YES == | 2086 | if (GNUNET_YES == |
2082 | GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, | 2087 | GNUNET_CONTAINER_multihashmap_contains(op->state->demanded_hashes, |
2083 | hash)) | 2088 | hash)) |
2084 | { | 2089 | { |
2085 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2090 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2086 | "Skipped sending duplicate demand\n"); | 2091 | "Skipped sending duplicate demand\n"); |
2087 | continue; | 2092 | continue; |
2088 | } | 2093 | } |
2089 | 2094 | ||
2090 | GNUNET_assert (GNUNET_OK == | 2095 | GNUNET_assert(GNUNET_OK == |
2091 | GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes, | 2096 | GNUNET_CONTAINER_multihashmap_put(op->state->demanded_hashes, |
2092 | hash, | 2097 | hash, |
2093 | NULL, | 2098 | NULL, |
2094 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); | 2099 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); |
2095 | 2100 | ||
2096 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2101 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2097 | "[OP %x] Requesting element (hash %s)\n", | 2102 | "[OP %x] Requesting element (hash %s)\n", |
2098 | (void *) op, GNUNET_h2s (hash)); | 2103 | (void *)op, GNUNET_h2s(hash)); |
2099 | ev = GNUNET_MQ_msg_header_extra (demands, | 2104 | ev = GNUNET_MQ_msg_header_extra(demands, |
2100 | sizeof (struct GNUNET_HashCode), | 2105 | sizeof(struct GNUNET_HashCode), |
2101 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); | 2106 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); |
2102 | GNUNET_memcpy (&demands[1], | 2107 | GNUNET_memcpy(&demands[1], |
2103 | hash, | 2108 | hash, |
2104 | sizeof (struct GNUNET_HashCode)); | 2109 | sizeof(struct GNUNET_HashCode)); |
2105 | GNUNET_MQ_send (op->mq, ev); | 2110 | GNUNET_MQ_send(op->mq, ev); |
2106 | } | 2111 | } |
2107 | GNUNET_CADET_receive_done (op->channel); | 2112 | GNUNET_CADET_receive_done(op->channel); |
2108 | } | 2113 | } |
2109 | 2114 | ||
2110 | 2115 | ||
@@ -2115,56 +2120,58 @@ handle_union_p2p_offer (void *cls, | |||
2115 | * @param mh the message | 2120 | * @param mh the message |
2116 | */ | 2121 | */ |
2117 | void | 2122 | void |
2118 | handle_union_p2p_done (void *cls, | 2123 | handle_union_p2p_done(void *cls, |
2119 | const struct GNUNET_MessageHeader *mh) | 2124 | const struct GNUNET_MessageHeader *mh) |
2120 | { | 2125 | { |
2121 | struct Operation *op = cls; | 2126 | struct Operation *op = cls; |
2122 | 2127 | ||
2123 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 2128 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
2124 | { | 2129 | { |
2125 | GNUNET_break_op (0); | 2130 | GNUNET_break_op(0); |
2126 | fail_union_operation (op); | 2131 | fail_union_operation(op); |
2127 | return; | 2132 | return; |
2128 | } | 2133 | } |
2129 | switch (op->state->phase) | 2134 | switch (op->state->phase) |
2130 | { | 2135 | { |
2131 | case PHASE_INVENTORY_PASSIVE: | 2136 | case PHASE_INVENTORY_PASSIVE: |
2132 | /* We got all requests, but still have to send our elements in response. */ | 2137 | /* We got all requests, but still have to send our elements in response. */ |
2133 | op->state->phase = PHASE_FINISH_WAITING; | 2138 | op->state->phase = PHASE_FINISH_WAITING; |
2134 | 2139 | ||
2135 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2140 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2136 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); | 2141 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); |
2137 | /* The active peer is done sending offers | 2142 | /* The active peer is done sending offers |
2138 | * and inquiries. This means that all | 2143 | * and inquiries. This means that all |
2139 | * our responses to that (demands and offers) | 2144 | * our responses to that (demands and offers) |
2140 | * must be in flight (queued or in mesh). | 2145 | * must be in flight (queued or in mesh). |
2141 | * | 2146 | * |
2142 | * We should notify the active peer once | 2147 | * We should notify the active peer once |
2143 | * all our demands are satisfied, so that the active | 2148 | * all our demands are satisfied, so that the active |
2144 | * peer can quit if we gave it everything. | 2149 | * peer can quit if we gave it everything. |
2145 | */ | 2150 | */ |
2146 | GNUNET_CADET_receive_done (op->channel); | 2151 | GNUNET_CADET_receive_done(op->channel); |
2147 | maybe_finish (op); | 2152 | maybe_finish(op); |
2148 | return; | 2153 | return; |
2149 | case PHASE_INVENTORY_ACTIVE: | 2154 | |
2150 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2155 | case PHASE_INVENTORY_ACTIVE: |
2151 | "got DONE (as active partner), waiting to finish\n"); | 2156 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2152 | /* All demands of the other peer are satisfied, | 2157 | "got DONE (as active partner), waiting to finish\n"); |
2153 | * and we processed all offers, thus we know | 2158 | /* All demands of the other peer are satisfied, |
2154 | * exactly what our demands must be. | 2159 | * and we processed all offers, thus we know |
2155 | * | 2160 | * exactly what our demands must be. |
2156 | * We'll close the channel | 2161 | * |
2157 | * to the other peer once our demands are met. | 2162 | * We'll close the channel |
2158 | */ | 2163 | * to the other peer once our demands are met. |
2159 | op->state->phase = PHASE_FINISH_CLOSING; | 2164 | */ |
2160 | GNUNET_CADET_receive_done (op->channel); | 2165 | op->state->phase = PHASE_FINISH_CLOSING; |
2161 | maybe_finish (op); | 2166 | GNUNET_CADET_receive_done(op->channel); |
2162 | return; | 2167 | maybe_finish(op); |
2163 | default: | 2168 | return; |
2164 | GNUNET_break_op (0); | 2169 | |
2165 | fail_union_operation (op); | 2170 | default: |
2166 | return; | 2171 | GNUNET_break_op(0); |
2167 | } | 2172 | fail_union_operation(op); |
2173 | return; | ||
2174 | } | ||
2168 | } | 2175 | } |
2169 | 2176 | ||
2170 | /** | 2177 | /** |
@@ -2174,10 +2181,10 @@ handle_union_p2p_done (void *cls, | |||
2174 | * @param mh the message | 2181 | * @param mh the message |
2175 | */ | 2182 | */ |
2176 | void | 2183 | void |
2177 | handle_union_p2p_over (void *cls, | 2184 | handle_union_p2p_over(void *cls, |
2178 | const struct GNUNET_MessageHeader *mh) | 2185 | const struct GNUNET_MessageHeader *mh) |
2179 | { | 2186 | { |
2180 | send_client_done (cls); | 2187 | send_client_done(cls); |
2181 | } | 2188 | } |
2182 | 2189 | ||
2183 | 2190 | ||
@@ -2189,54 +2196,54 @@ handle_union_p2p_over (void *cls, | |||
2189 | * to convince it to accept, may be NULL | 2196 | * to convince it to accept, may be NULL |
2190 | */ | 2197 | */ |
2191 | static struct OperationState * | 2198 | static struct OperationState * |
2192 | union_evaluate (struct Operation *op, | 2199 | union_evaluate(struct Operation *op, |
2193 | const struct GNUNET_MessageHeader *opaque_context) | 2200 | const struct GNUNET_MessageHeader *opaque_context) |
2194 | { | 2201 | { |
2195 | struct OperationState *state; | 2202 | struct OperationState *state; |
2196 | struct GNUNET_MQ_Envelope *ev; | 2203 | struct GNUNET_MQ_Envelope *ev; |
2197 | struct OperationRequestMessage *msg; | 2204 | struct OperationRequestMessage *msg; |
2198 | 2205 | ||
2199 | ev = GNUNET_MQ_msg_nested_mh (msg, | 2206 | ev = GNUNET_MQ_msg_nested_mh(msg, |
2200 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 2207 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
2201 | opaque_context); | 2208 | opaque_context); |
2202 | if (NULL == ev) | 2209 | if (NULL == ev) |
2203 | { | 2210 | { |
2204 | /* the context message is too large */ | 2211 | /* the context message is too large */ |
2205 | GNUNET_break (0); | 2212 | GNUNET_break(0); |
2206 | return NULL; | 2213 | return NULL; |
2207 | } | 2214 | } |
2208 | state = GNUNET_new (struct OperationState); | 2215 | state = GNUNET_new(struct OperationState); |
2209 | state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, | 2216 | state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32, |
2210 | GNUNET_NO); | 2217 | GNUNET_NO); |
2211 | /* copy the current generation's strata estimator for this operation */ | 2218 | /* copy the current generation's strata estimator for this operation */ |
2212 | state->se = strata_estimator_dup (op->set->state->se); | 2219 | state->se = strata_estimator_dup(op->set->state->se); |
2213 | /* we started the operation, thus we have to send the operation request */ | 2220 | /* we started the operation, thus we have to send the operation request */ |
2214 | state->phase = PHASE_EXPECT_SE; | 2221 | state->phase = PHASE_EXPECT_SE; |
2215 | state->salt_receive = state->salt_send = 42; // FIXME????? | 2222 | state->salt_receive = state->salt_send = 42; // FIXME????? |
2216 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2223 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2217 | "Initiating union operation evaluation\n"); | 2224 | "Initiating union operation evaluation\n"); |
2218 | GNUNET_STATISTICS_update (_GSS_statistics, | 2225 | GNUNET_STATISTICS_update(_GSS_statistics, |
2219 | "# of total union operations", | 2226 | "# of total union operations", |
2220 | 1, | 2227 | 1, |
2221 | GNUNET_NO); | 2228 | GNUNET_NO); |
2222 | GNUNET_STATISTICS_update (_GSS_statistics, | 2229 | GNUNET_STATISTICS_update(_GSS_statistics, |
2223 | "# of initiated union operations", | 2230 | "# of initiated union operations", |
2224 | 1, | 2231 | 1, |
2225 | GNUNET_NO); | 2232 | GNUNET_NO); |
2226 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); | 2233 | msg->operation = htonl(GNUNET_SET_OPERATION_UNION); |
2227 | GNUNET_MQ_send (op->mq, | 2234 | GNUNET_MQ_send(op->mq, |
2228 | ev); | 2235 | ev); |
2229 | 2236 | ||
2230 | if (NULL != opaque_context) | 2237 | if (NULL != opaque_context) |
2231 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2238 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2232 | "sent op request with context message\n"); | 2239 | "sent op request with context message\n"); |
2233 | else | 2240 | else |
2234 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2241 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2235 | "sent op request without context message\n"); | 2242 | "sent op request without context message\n"); |
2236 | 2243 | ||
2237 | op->state = state; | 2244 | op->state = state; |
2238 | initialize_key_to_element (op); | 2245 | initialize_key_to_element(op); |
2239 | state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element); | 2246 | state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element); |
2240 | return state; | 2247 | return state; |
2241 | } | 2248 | } |
2242 | 2249 | ||
@@ -2248,7 +2255,7 @@ union_evaluate (struct Operation *op, | |||
2248 | * @param op operation that will be accepted as a union operation | 2255 | * @param op operation that will be accepted as a union operation |
2249 | */ | 2256 | */ |
2250 | static struct OperationState * | 2257 | static struct OperationState * |
2251 | union_accept (struct Operation *op) | 2258 | union_accept(struct Operation *op) |
2252 | { | 2259 | { |
2253 | struct OperationState *state; | 2260 | struct OperationState *state; |
2254 | const struct StrataEstimator *se; | 2261 | const struct StrataEstimator *se; |
@@ -2258,46 +2265,46 @@ union_accept (struct Operation *op) | |||
2258 | size_t len; | 2265 | size_t len; |
2259 | uint16_t type; | 2266 | uint16_t type; |
2260 | 2267 | ||
2261 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2268 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2262 | "accepting set union operation\n"); | 2269 | "accepting set union operation\n"); |
2263 | GNUNET_STATISTICS_update (_GSS_statistics, | 2270 | GNUNET_STATISTICS_update(_GSS_statistics, |
2264 | "# of accepted union operations", | 2271 | "# of accepted union operations", |
2265 | 1, | 2272 | 1, |
2266 | GNUNET_NO); | 2273 | GNUNET_NO); |
2267 | GNUNET_STATISTICS_update (_GSS_statistics, | 2274 | GNUNET_STATISTICS_update(_GSS_statistics, |
2268 | "# of total union operations", | 2275 | "# of total union operations", |
2269 | 1, | 2276 | 1, |
2270 | GNUNET_NO); | 2277 | GNUNET_NO); |
2271 | 2278 | ||
2272 | state = GNUNET_new (struct OperationState); | 2279 | state = GNUNET_new(struct OperationState); |
2273 | state->se = strata_estimator_dup (op->set->state->se); | 2280 | state->se = strata_estimator_dup(op->set->state->se); |
2274 | state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, | 2281 | state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32, |
2275 | GNUNET_NO); | 2282 | GNUNET_NO); |
2276 | state->salt_receive = state->salt_send = 42; // FIXME????? | 2283 | state->salt_receive = state->salt_send = 42; // FIXME????? |
2277 | op->state = state; | 2284 | op->state = state; |
2278 | initialize_key_to_element (op); | 2285 | initialize_key_to_element(op); |
2279 | state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element); | 2286 | state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element); |
2280 | 2287 | ||
2281 | /* kick off the operation */ | 2288 | /* kick off the operation */ |
2282 | se = state->se; | 2289 | se = state->se; |
2283 | buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); | 2290 | buf = GNUNET_malloc(se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); |
2284 | len = strata_estimator_write (se, | 2291 | len = strata_estimator_write(se, |
2285 | buf); | 2292 | buf); |
2286 | if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) | 2293 | if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) |
2287 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; | 2294 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; |
2288 | else | 2295 | else |
2289 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; | 2296 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; |
2290 | ev = GNUNET_MQ_msg_extra (strata_msg, | 2297 | ev = GNUNET_MQ_msg_extra(strata_msg, |
2291 | len, | 2298 | len, |
2292 | type); | 2299 | type); |
2293 | GNUNET_memcpy (&strata_msg[1], | 2300 | GNUNET_memcpy(&strata_msg[1], |
2294 | buf, | 2301 | buf, |
2295 | len); | 2302 | len); |
2296 | GNUNET_free (buf); | 2303 | GNUNET_free(buf); |
2297 | strata_msg->set_size | 2304 | strata_msg->set_size |
2298 | = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); | 2305 | = GNUNET_htonll(GNUNET_CONTAINER_multihashmap_size(op->set->content->elements)); |
2299 | GNUNET_MQ_send (op->mq, | 2306 | GNUNET_MQ_send(op->mq, |
2300 | ev); | 2307 | ev); |
2301 | state->phase = PHASE_EXPECT_IBF; | 2308 | state->phase = PHASE_EXPECT_IBF; |
2302 | return state; | 2309 | return state; |
2303 | } | 2310 | } |
@@ -2312,22 +2319,22 @@ union_accept (struct Operation *op) | |||
2312 | * @return the newly created set, NULL on error | 2319 | * @return the newly created set, NULL on error |
2313 | */ | 2320 | */ |
2314 | static struct SetState * | 2321 | static struct SetState * |
2315 | union_set_create (void) | 2322 | union_set_create(void) |
2316 | { | 2323 | { |
2317 | struct SetState *set_state; | 2324 | struct SetState *set_state; |
2318 | 2325 | ||
2319 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2326 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
2320 | "union set created\n"); | 2327 | "union set created\n"); |
2321 | set_state = GNUNET_new (struct SetState); | 2328 | set_state = GNUNET_new(struct SetState); |
2322 | set_state->se = strata_estimator_create (SE_STRATA_COUNT, | 2329 | set_state->se = strata_estimator_create(SE_STRATA_COUNT, |
2323 | SE_IBF_SIZE, SE_IBF_HASH_NUM); | 2330 | SE_IBF_SIZE, SE_IBF_HASH_NUM); |
2324 | if (NULL == set_state->se) | 2331 | if (NULL == set_state->se) |
2325 | { | 2332 | { |
2326 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2333 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, |
2327 | "Failed to allocate strata estimator\n"); | 2334 | "Failed to allocate strata estimator\n"); |
2328 | GNUNET_free (set_state); | 2335 | GNUNET_free(set_state); |
2329 | return NULL; | 2336 | return NULL; |
2330 | } | 2337 | } |
2331 | return set_state; | 2338 | return set_state; |
2332 | } | 2339 | } |
2333 | 2340 | ||
@@ -2339,11 +2346,11 @@ union_set_create (void) | |||
2339 | * @param ee the element to add to the set | 2346 | * @param ee the element to add to the set |
2340 | */ | 2347 | */ |
2341 | static void | 2348 | static void |
2342 | union_add (struct SetState *set_state, | 2349 | union_add(struct SetState *set_state, |
2343 | struct ElementEntry *ee) | 2350 | struct ElementEntry *ee) |
2344 | { | 2351 | { |
2345 | strata_estimator_insert (set_state->se, | 2352 | strata_estimator_insert(set_state->se, |
2346 | get_ibf_key (&ee->element_hash)); | 2353 | get_ibf_key(&ee->element_hash)); |
2347 | } | 2354 | } |
2348 | 2355 | ||
2349 | 2356 | ||
@@ -2355,11 +2362,11 @@ union_add (struct SetState *set_state, | |||
2355 | * @param ee set element to remove | 2362 | * @param ee set element to remove |
2356 | */ | 2363 | */ |
2357 | static void | 2364 | static void |
2358 | union_remove (struct SetState *set_state, | 2365 | union_remove(struct SetState *set_state, |
2359 | struct ElementEntry *ee) | 2366 | struct ElementEntry *ee) |
2360 | { | 2367 | { |
2361 | strata_estimator_remove (set_state->se, | 2368 | strata_estimator_remove(set_state->se, |
2362 | get_ibf_key (&ee->element_hash)); | 2369 | get_ibf_key(&ee->element_hash)); |
2363 | } | 2370 | } |
2364 | 2371 | ||
2365 | 2372 | ||
@@ -2369,14 +2376,14 @@ union_remove (struct SetState *set_state, | |||
2369 | * @param set_state the set to destroy | 2376 | * @param set_state the set to destroy |
2370 | */ | 2377 | */ |
2371 | static void | 2378 | static void |
2372 | union_set_destroy (struct SetState *set_state) | 2379 | union_set_destroy(struct SetState *set_state) |
2373 | { | 2380 | { |
2374 | if (NULL != set_state->se) | 2381 | if (NULL != set_state->se) |
2375 | { | 2382 | { |
2376 | strata_estimator_destroy (set_state->se); | 2383 | strata_estimator_destroy(set_state->se); |
2377 | set_state->se = NULL; | 2384 | set_state->se = NULL; |
2378 | } | 2385 | } |
2379 | GNUNET_free (set_state); | 2386 | GNUNET_free(set_state); |
2380 | } | 2387 | } |
2381 | 2388 | ||
2382 | 2389 | ||
@@ -2387,14 +2394,14 @@ union_set_destroy (struct SetState *set_state) | |||
2387 | * @return a copy of the union-specific set state | 2394 | * @return a copy of the union-specific set state |
2388 | */ | 2395 | */ |
2389 | static struct SetState * | 2396 | static struct SetState * |
2390 | union_copy_state (struct SetState *state) | 2397 | union_copy_state(struct SetState *state) |
2391 | { | 2398 | { |
2392 | struct SetState *new_state; | 2399 | struct SetState *new_state; |
2393 | 2400 | ||
2394 | GNUNET_assert ( (NULL != state) && | 2401 | GNUNET_assert((NULL != state) && |
2395 | (NULL != state->se) ); | 2402 | (NULL != state->se)); |
2396 | new_state = GNUNET_new (struct SetState); | 2403 | new_state = GNUNET_new(struct SetState); |
2397 | new_state->se = strata_estimator_dup (state->se); | 2404 | new_state->se = strata_estimator_dup(state->se); |
2398 | 2405 | ||
2399 | return new_state; | 2406 | return new_state; |
2400 | } | 2407 | } |
@@ -2406,11 +2413,11 @@ union_copy_state (struct SetState *state) | |||
2406 | * @param op operation that lost the channel | 2413 | * @param op operation that lost the channel |
2407 | */ | 2414 | */ |
2408 | static void | 2415 | static void |
2409 | union_channel_death (struct Operation *op) | 2416 | union_channel_death(struct Operation *op) |
2410 | { | 2417 | { |
2411 | send_client_done (op); | 2418 | send_client_done(op); |
2412 | _GSS_operation_destroy (op, | 2419 | _GSS_operation_destroy(op, |
2413 | GNUNET_YES); | 2420 | GNUNET_YES); |
2414 | } | 2421 | } |
2415 | 2422 | ||
2416 | 2423 | ||
@@ -2421,7 +2428,7 @@ union_channel_death (struct Operation *op) | |||
2421 | * @return the operation specific VTable | 2428 | * @return the operation specific VTable |
2422 | */ | 2429 | */ |
2423 | const struct SetVT * | 2430 | const struct SetVT * |
2424 | _GSS_union_vt () | 2431 | _GSS_union_vt() |
2425 | { | 2432 | { |
2426 | static const struct SetVT union_vt = { | 2433 | static const struct SetVT union_vt = { |
2427 | .create = &union_set_create, | 2434 | .create = &union_set_create, |