diff options
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r-- | src/set/gnunet-service-set_union.c | 2266 |
1 files changed, 1145 insertions, 1121 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index fd7bc24d4..ca4ef2092 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -34,7 +34,7 @@ | |||
34 | #include <gcrypt.h> | 34 | #include <gcrypt.h> |
35 | 35 | ||
36 | 36 | ||
37 | #define LOG(kind, ...) GNUNET_log_from(kind, "set-union", __VA_ARGS__) | 37 | #define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__) |
38 | 38 | ||
39 | 39 | ||
40 | /** | 40 | /** |
@@ -74,7 +74,8 @@ | |||
74 | /** | 74 | /** |
75 | * Current phase we are in for a union operation. | 75 | * Current phase we are in for a union operation. |
76 | */ | 76 | */ |
77 | enum UnionOperationPhase { | 77 | enum UnionOperationPhase |
78 | { | ||
78 | /** | 79 | /** |
79 | * We sent the request message, and expect a strata estimator. | 80 | * We sent the request message, and expect a strata estimator. |
80 | */ | 81 | */ |
@@ -138,7 +139,8 @@ enum UnionOperationPhase { | |||
138 | /** | 139 | /** |
139 | * State of an evaluate operation with another peer. | 140 | * State of an evaluate operation with another peer. |
140 | */ | 141 | */ |
141 | struct OperationState { | 142 | struct OperationState |
143 | { | ||
142 | /** | 144 | /** |
143 | * Copy of the set's strata estimator at the time of | 145 | * Copy of the set's strata estimator at the time of |
144 | * creation of this operation. | 146 | * creation of this operation. |
@@ -214,7 +216,8 @@ struct OperationState { | |||
214 | /** | 216 | /** |
215 | * The key entry is used to associate an ibf key with an element. | 217 | * The key entry is used to associate an ibf key with an element. |
216 | */ | 218 | */ |
217 | struct KeyEntry { | 219 | struct KeyEntry |
220 | { | ||
218 | /** | 221 | /** |
219 | * IBF key for the entry, derived from the current salt. | 222 | * IBF key for the entry, derived from the current salt. |
220 | */ | 223 | */ |
@@ -242,7 +245,8 @@ struct KeyEntry { | |||
242 | * Used as a closure for sending elements | 245 | * Used as a closure for sending elements |
243 | * with a specific IBF key. | 246 | * with a specific IBF key. |
244 | */ | 247 | */ |
245 | struct SendElementClosure { | 248 | struct SendElementClosure |
249 | { | ||
246 | /** | 250 | /** |
247 | * The IBF key whose matching elements should be | 251 | * The IBF key whose matching elements should be |
248 | * sent. | 252 | * sent. |
@@ -260,7 +264,8 @@ struct SendElementClosure { | |||
260 | /** | 264 | /** |
261 | * Extra state required for efficient set union. | 265 | * Extra state required for efficient set union. |
262 | */ | 266 | */ |
263 | struct SetState { | 267 | struct SetState |
268 | { | ||
264 | /** | 269 | /** |
265 | * The strata estimator is only generated once for | 270 | * The strata estimator is only generated once for |
266 | * each set. | 271 | * each set. |
@@ -282,19 +287,19 @@ struct SetState { | |||
282 | * #GNUNET_NO if not. | 287 | * #GNUNET_NO if not. |
283 | */ | 288 | */ |
284 | static int | 289 | static int |
285 | destroy_key_to_element_iter(void *cls, | 290 | destroy_key_to_element_iter (void *cls, |
286 | uint32_t key, | 291 | uint32_t key, |
287 | void *value) | 292 | void *value) |
288 | { | 293 | { |
289 | struct KeyEntry *k = value; | 294 | struct KeyEntry *k = value; |
290 | 295 | ||
291 | GNUNET_assert(NULL != k); | 296 | GNUNET_assert (NULL != k); |
292 | if (GNUNET_YES == k->element->remote) | 297 | if (GNUNET_YES == k->element->remote) |
293 | { | 298 | { |
294 | GNUNET_free(k->element); | 299 | GNUNET_free (k->element); |
295 | k->element = NULL; | 300 | k->element = NULL; |
296 | } | 301 | } |
297 | GNUNET_free(k); | 302 | GNUNET_free (k); |
298 | return GNUNET_YES; | 303 | return GNUNET_YES; |
299 | } | 304 | } |
300 | 305 | ||
@@ -306,44 +311,44 @@ destroy_key_to_element_iter(void *cls, | |||
306 | * @param op union operation to destroy | 311 | * @param op union operation to destroy |
307 | */ | 312 | */ |
308 | static void | 313 | static void |
309 | union_op_cancel(struct Operation *op) | 314 | union_op_cancel (struct Operation *op) |
310 | { | 315 | { |
311 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 316 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
312 | "destroying union op\n"); | 317 | "destroying union op\n"); |
313 | /* check if the op was canceled twice */ | 318 | /* check if the op was canceled twice */ |
314 | GNUNET_assert(NULL != op->state); | 319 | GNUNET_assert (NULL != op->state); |
315 | if (NULL != op->state->remote_ibf) | 320 | if (NULL != op->state->remote_ibf) |
316 | { | 321 | { |
317 | ibf_destroy(op->state->remote_ibf); | 322 | ibf_destroy (op->state->remote_ibf); |
318 | op->state->remote_ibf = NULL; | 323 | op->state->remote_ibf = NULL; |
319 | } | 324 | } |
320 | if (NULL != op->state->demanded_hashes) | 325 | if (NULL != op->state->demanded_hashes) |
321 | { | 326 | { |
322 | GNUNET_CONTAINER_multihashmap_destroy(op->state->demanded_hashes); | 327 | GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); |
323 | op->state->demanded_hashes = NULL; | 328 | op->state->demanded_hashes = NULL; |
324 | } | 329 | } |
325 | if (NULL != op->state->local_ibf) | 330 | if (NULL != op->state->local_ibf) |
326 | { | 331 | { |
327 | ibf_destroy(op->state->local_ibf); | 332 | ibf_destroy (op->state->local_ibf); |
328 | op->state->local_ibf = NULL; | 333 | op->state->local_ibf = NULL; |
329 | } | 334 | } |
330 | if (NULL != op->state->se) | 335 | if (NULL != op->state->se) |
331 | { | 336 | { |
332 | strata_estimator_destroy(op->state->se); | 337 | strata_estimator_destroy (op->state->se); |
333 | op->state->se = NULL; | 338 | op->state->se = NULL; |
334 | } | 339 | } |
335 | if (NULL != op->state->key_to_element) | 340 | if (NULL != op->state->key_to_element) |
336 | { | 341 | { |
337 | GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, | 342 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, |
338 | &destroy_key_to_element_iter, | 343 | &destroy_key_to_element_iter, |
339 | NULL); | 344 | NULL); |
340 | GNUNET_CONTAINER_multihashmap32_destroy(op->state->key_to_element); | 345 | GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); |
341 | op->state->key_to_element = NULL; | 346 | op->state->key_to_element = NULL; |
342 | } | 347 | } |
343 | GNUNET_free(op->state); | 348 | GNUNET_free (op->state); |
344 | op->state = NULL; | 349 | op->state = NULL; |
345 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 350 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
346 | "destroying union op done\n"); | 351 | "destroying union op done\n"); |
347 | } | 352 | } |
348 | 353 | ||
349 | 354 | ||
@@ -354,20 +359,20 @@ union_op_cancel(struct Operation *op) | |||
354 | * @param op the union operation to fail | 359 | * @param op the union operation to fail |
355 | */ | 360 | */ |
356 | static void | 361 | static void |
357 | fail_union_operation(struct Operation *op) | 362 | fail_union_operation (struct Operation *op) |
358 | { | 363 | { |
359 | struct GNUNET_MQ_Envelope *ev; | 364 | struct GNUNET_MQ_Envelope *ev; |
360 | struct GNUNET_SET_ResultMessage *msg; | 365 | struct GNUNET_SET_ResultMessage *msg; |
361 | 366 | ||
362 | LOG(GNUNET_ERROR_TYPE_WARNING, | 367 | LOG (GNUNET_ERROR_TYPE_WARNING, |
363 | "union operation failed\n"); | 368 | "union operation failed\n"); |
364 | ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 369 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
365 | msg->result_status = htons(GNUNET_SET_STATUS_FAILURE); | 370 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
366 | msg->request_id = htonl(op->client_request_id); | 371 | msg->request_id = htonl (op->client_request_id); |
367 | msg->element_type = htons(0); | 372 | msg->element_type = htons (0); |
368 | GNUNET_MQ_send(op->set->cs->mq, | 373 | GNUNET_MQ_send (op->set->cs->mq, |
369 | ev); | 374 | ev); |
370 | _GSS_operation_destroy(op, GNUNET_YES); | 375 | _GSS_operation_destroy (op, GNUNET_YES); |
371 | } | 376 | } |
372 | 377 | ||
373 | 378 | ||
@@ -379,16 +384,16 @@ fail_union_operation(struct Operation *op) | |||
379 | * @return the derived IBF key | 384 | * @return the derived IBF key |
380 | */ | 385 | */ |
381 | static struct IBF_Key | 386 | static struct IBF_Key |
382 | get_ibf_key(const struct GNUNET_HashCode *src) | 387 | get_ibf_key (const struct GNUNET_HashCode *src) |
383 | { | 388 | { |
384 | struct IBF_Key key; | 389 | struct IBF_Key key; |
385 | uint16_t salt = 0; | 390 | uint16_t salt = 0; |
386 | 391 | ||
387 | GNUNET_assert(GNUNET_OK == | 392 | GNUNET_assert (GNUNET_OK == |
388 | GNUNET_CRYPTO_kdf(&key, sizeof(key), | 393 | GNUNET_CRYPTO_kdf (&key, sizeof(key), |
389 | src, sizeof *src, | 394 | src, sizeof *src, |
390 | &salt, sizeof(salt), | 395 | &salt, sizeof(salt), |
391 | NULL, 0)); | 396 | NULL, 0)); |
392 | return key; | 397 | return key; |
393 | } | 398 | } |
394 | 399 | ||
@@ -396,7 +401,8 @@ get_ibf_key(const struct GNUNET_HashCode *src) | |||
396 | /** | 401 | /** |
397 | * Context for #op_get_element_iterator | 402 | * Context for #op_get_element_iterator |
398 | */ | 403 | */ |
399 | struct GetElementContext { | 404 | struct GetElementContext |
405 | { | ||
400 | /** | 406 | /** |
401 | * FIXME. | 407 | * FIXME. |
402 | */ | 408 | */ |
@@ -420,20 +426,20 @@ struct GetElementContext { | |||
420 | * #GNUNET_NO if we've found the element. | 426 | * #GNUNET_NO if we've found the element. |
421 | */ | 427 | */ |
422 | static int | 428 | static int |
423 | op_get_element_iterator(void *cls, | 429 | op_get_element_iterator (void *cls, |
424 | uint32_t key, | 430 | uint32_t key, |
425 | void *value) | 431 | void *value) |
426 | { | 432 | { |
427 | struct GetElementContext *ctx = cls; | 433 | struct GetElementContext *ctx = cls; |
428 | struct KeyEntry *k = value; | 434 | struct KeyEntry *k = value; |
429 | 435 | ||
430 | GNUNET_assert(NULL != k); | 436 | GNUNET_assert (NULL != k); |
431 | if (0 == GNUNET_CRYPTO_hash_cmp(&k->element->element_hash, | 437 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, |
432 | &ctx->hash)) | 438 | &ctx->hash)) |
433 | { | 439 | { |
434 | ctx->k = k; | 440 | ctx->k = k; |
435 | return GNUNET_NO; | 441 | return GNUNET_NO; |
436 | } | 442 | } |
437 | return GNUNET_YES; | 443 | return GNUNET_YES; |
438 | } | 444 | } |
439 | 445 | ||
@@ -447,8 +453,8 @@ op_get_element_iterator(void *cls, | |||
447 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise | 453 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise |
448 | */ | 454 | */ |
449 | static struct KeyEntry * | 455 | static struct KeyEntry * |
450 | op_get_element(struct Operation *op, | 456 | op_get_element (struct Operation *op, |
451 | const struct GNUNET_HashCode *element_hash) | 457 | const struct GNUNET_HashCode *element_hash) |
452 | { | 458 | { |
453 | int ret; | 459 | int ret; |
454 | struct IBF_Key ibf_key; | 460 | struct IBF_Key ibf_key; |
@@ -456,18 +462,18 @@ op_get_element(struct Operation *op, | |||
456 | 462 | ||
457 | ctx.hash = *element_hash; | 463 | ctx.hash = *element_hash; |
458 | 464 | ||
459 | ibf_key = get_ibf_key(element_hash); | 465 | ibf_key = get_ibf_key (element_hash); |
460 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element, | 466 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, |
461 | (uint32_t)ibf_key.key_val, | 467 | (uint32_t) ibf_key.key_val, |
462 | op_get_element_iterator, | 468 | op_get_element_iterator, |
463 | &ctx); | 469 | &ctx); |
464 | 470 | ||
465 | /* was the iteration aborted because we found the element? */ | 471 | /* was the iteration aborted because we found the element? */ |
466 | if (GNUNET_SYSERR == ret) | 472 | if (GNUNET_SYSERR == ret) |
467 | { | 473 | { |
468 | GNUNET_assert(NULL != ctx.k); | 474 | GNUNET_assert (NULL != ctx.k); |
469 | return ctx.k; | 475 | return ctx.k; |
470 | } | 476 | } |
471 | return NULL; | 477 | return NULL; |
472 | } | 478 | } |
473 | 479 | ||
@@ -487,23 +493,23 @@ op_get_element(struct Operation *op, | |||
487 | * @parem received was this element received from the remote peer? | 493 | * @parem received was this element received from the remote peer? |
488 | */ | 494 | */ |
489 | static void | 495 | static void |
490 | op_register_element(struct Operation *op, | 496 | op_register_element (struct Operation *op, |
491 | struct ElementEntry *ee, | 497 | struct ElementEntry *ee, |
492 | int received) | 498 | int received) |
493 | { | 499 | { |
494 | struct IBF_Key ibf_key; | 500 | struct IBF_Key ibf_key; |
495 | struct KeyEntry *k; | 501 | struct KeyEntry *k; |
496 | 502 | ||
497 | ibf_key = get_ibf_key(&ee->element_hash); | 503 | ibf_key = get_ibf_key (&ee->element_hash); |
498 | k = GNUNET_new(struct KeyEntry); | 504 | k = GNUNET_new (struct KeyEntry); |
499 | k->element = ee; | 505 | k->element = ee; |
500 | k->ibf_key = ibf_key; | 506 | k->ibf_key = ibf_key; |
501 | k->received = received; | 507 | k->received = received; |
502 | GNUNET_assert(GNUNET_OK == | 508 | GNUNET_assert (GNUNET_OK == |
503 | GNUNET_CONTAINER_multihashmap32_put(op->state->key_to_element, | 509 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, |
504 | (uint32_t)ibf_key.key_val, | 510 | (uint32_t) ibf_key.key_val, |
505 | k, | 511 | k, |
506 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 512 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
507 | } | 513 | } |
508 | 514 | ||
509 | 515 | ||
@@ -511,9 +517,9 @@ op_register_element(struct Operation *op, | |||
511 | * FIXME. | 517 | * FIXME. |
512 | */ | 518 | */ |
513 | static void | 519 | static void |
514 | salt_key(const struct IBF_Key *k_in, | 520 | salt_key (const struct IBF_Key *k_in, |
515 | uint32_t salt, | 521 | uint32_t salt, |
516 | struct IBF_Key *k_out) | 522 | struct IBF_Key *k_out) |
517 | { | 523 | { |
518 | int s = salt % 64; | 524 | int s = salt % 64; |
519 | uint64_t x = k_in->key_val; | 525 | uint64_t x = k_in->key_val; |
@@ -528,9 +534,9 @@ salt_key(const struct IBF_Key *k_in, | |||
528 | * FIXME. | 534 | * FIXME. |
529 | */ | 535 | */ |
530 | static void | 536 | static void |
531 | unsalt_key(const struct IBF_Key *k_in, | 537 | unsalt_key (const struct IBF_Key *k_in, |
532 | uint32_t salt, | 538 | uint32_t salt, |
533 | struct IBF_Key *k_out) | 539 | struct IBF_Key *k_out) |
534 | { | 540 | { |
535 | int s = salt % 64; | 541 | int s = salt % 64; |
536 | uint64_t x = k_in->key_val; | 542 | uint64_t x = k_in->key_val; |
@@ -548,23 +554,23 @@ unsalt_key(const struct IBF_Key *k_in, | |||
548 | * @param value the key entry to get the key from | 554 | * @param value the key entry to get the key from |
549 | */ | 555 | */ |
550 | static int | 556 | static int |
551 | prepare_ibf_iterator(void *cls, | 557 | prepare_ibf_iterator (void *cls, |
552 | uint32_t key, | 558 | uint32_t key, |
553 | void *value) | 559 | void *value) |
554 | { | 560 | { |
555 | struct Operation *op = cls; | 561 | struct Operation *op = cls; |
556 | struct KeyEntry *ke = value; | 562 | struct KeyEntry *ke = value; |
557 | struct IBF_Key salted_key; | 563 | struct IBF_Key salted_key; |
558 | 564 | ||
559 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 565 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
560 | "[OP %x] inserting %lx (hash %s) into ibf\n", | 566 | "[OP %x] inserting %lx (hash %s) into ibf\n", |
561 | (void *)op, | 567 | (void *) op, |
562 | (unsigned long)ke->ibf_key.key_val, | 568 | (unsigned long) ke->ibf_key.key_val, |
563 | GNUNET_h2s(&ke->element->element_hash)); | 569 | GNUNET_h2s (&ke->element->element_hash)); |
564 | salt_key(&ke->ibf_key, | 570 | salt_key (&ke->ibf_key, |
565 | op->state->salt_send, | 571 | op->state->salt_send, |
566 | &salted_key); | 572 | &salted_key); |
567 | ibf_insert(op->state->local_ibf, salted_key); | 573 | ibf_insert (op->state->local_ibf, salted_key); |
568 | return GNUNET_YES; | 574 | return GNUNET_YES; |
569 | } | 575 | } |
570 | 576 | ||
@@ -580,9 +586,9 @@ prepare_ibf_iterator(void *cls, | |||
580 | * @return #GNUNET_YES (to continue iterating) | 586 | * @return #GNUNET_YES (to continue iterating) |
581 | */ | 587 | */ |
582 | static int | 588 | static int |
583 | init_key_to_element_iterator(void *cls, | 589 | init_key_to_element_iterator (void *cls, |
584 | const struct GNUNET_HashCode *key, | 590 | const struct GNUNET_HashCode *key, |
585 | void *value) | 591 | void *value) |
586 | { | 592 | { |
587 | struct Operation *op = cls; | 593 | struct Operation *op = cls; |
588 | struct ElementEntry *ee = value; | 594 | struct ElementEntry *ee = value; |
@@ -590,13 +596,13 @@ init_key_to_element_iterator(void *cls, | |||
590 | /* make sure that the element belongs to the set at the time | 596 | /* make sure that the element belongs to the set at the time |
591 | * of creating the operation */ | 597 | * of creating the operation */ |
592 | if (GNUNET_NO == | 598 | if (GNUNET_NO == |
593 | _GSS_is_element_of_operation(ee, | 599 | _GSS_is_element_of_operation (ee, |
594 | op)) | 600 | op)) |
595 | return GNUNET_YES; | 601 | return GNUNET_YES; |
596 | GNUNET_assert(GNUNET_NO == ee->remote); | 602 | GNUNET_assert (GNUNET_NO == ee->remote); |
597 | op_register_element(op, | 603 | op_register_element (op, |
598 | ee, | 604 | ee, |
599 | GNUNET_NO); | 605 | GNUNET_NO); |
600 | return GNUNET_YES; | 606 | return GNUNET_YES; |
601 | } | 607 | } |
602 | 608 | ||
@@ -608,16 +614,16 @@ init_key_to_element_iterator(void *cls, | |||
608 | * @param op the set union operation | 614 | * @param op the set union operation |
609 | */ | 615 | */ |
610 | static void | 616 | static void |
611 | initialize_key_to_element(struct Operation *op) | 617 | initialize_key_to_element (struct Operation *op) |
612 | { | 618 | { |
613 | unsigned int len; | 619 | unsigned int len; |
614 | 620 | ||
615 | GNUNET_assert(NULL == op->state->key_to_element); | 621 | GNUNET_assert (NULL == op->state->key_to_element); |
616 | len = GNUNET_CONTAINER_multihashmap_size(op->set->content->elements); | 622 | len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements); |
617 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create(len + 1); | 623 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); |
618 | GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements, | 624 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, |
619 | &init_key_to_element_iterator, | 625 | &init_key_to_element_iterator, |
620 | op); | 626 | op); |
621 | } | 627 | } |
622 | 628 | ||
623 | 629 | ||
@@ -630,23 +636,23 @@ initialize_key_to_element(struct Operation *op) | |||
630 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure | 636 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure |
631 | */ | 637 | */ |
632 | static int | 638 | static int |
633 | prepare_ibf(struct Operation *op, | 639 | prepare_ibf (struct Operation *op, |
634 | uint32_t size) | 640 | uint32_t size) |
635 | { | 641 | { |
636 | GNUNET_assert(NULL != op->state->key_to_element); | 642 | GNUNET_assert (NULL != op->state->key_to_element); |
637 | 643 | ||
638 | if (NULL != op->state->local_ibf) | 644 | if (NULL != op->state->local_ibf) |
639 | ibf_destroy(op->state->local_ibf); | 645 | ibf_destroy (op->state->local_ibf); |
640 | op->state->local_ibf = ibf_create(size, SE_IBF_HASH_NUM); | 646 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
641 | if (NULL == op->state->local_ibf) | 647 | if (NULL == op->state->local_ibf) |
642 | { | 648 | { |
643 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | 649 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
644 | "Failed to allocate local IBF\n"); | 650 | "Failed to allocate local IBF\n"); |
645 | return GNUNET_SYSERR; | 651 | return GNUNET_SYSERR; |
646 | } | 652 | } |
647 | GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, | 653 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, |
648 | &prepare_ibf_iterator, | 654 | &prepare_ibf_iterator, |
649 | op); | 655 | op); |
650 | return GNUNET_OK; | 656 | return GNUNET_OK; |
651 | } | 657 | } |
652 | 658 | ||
@@ -661,60 +667,60 @@ prepare_ibf(struct Operation *op, | |||
661 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure | 667 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure |
662 | */ | 668 | */ |
663 | static int | 669 | static int |
664 | send_ibf(struct Operation *op, | 670 | send_ibf (struct Operation *op, |
665 | uint16_t ibf_order) | 671 | uint16_t ibf_order) |
666 | { | 672 | { |
667 | unsigned int buckets_sent = 0; | 673 | unsigned int buckets_sent = 0; |
668 | struct InvertibleBloomFilter *ibf; | 674 | struct InvertibleBloomFilter *ibf; |
669 | 675 | ||
670 | if (GNUNET_OK != | 676 | if (GNUNET_OK != |
671 | prepare_ibf(op, 1 << ibf_order)) | 677 | prepare_ibf (op, 1 << ibf_order)) |
672 | { | 678 | { |
673 | /* allocation failed */ | 679 | /* allocation failed */ |
674 | return GNUNET_SYSERR; | 680 | return GNUNET_SYSERR; |
675 | } | 681 | } |
676 | 682 | ||
677 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 683 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
678 | "sending ibf of size %u\n", | 684 | "sending ibf of size %u\n", |
679 | 1 << ibf_order); | 685 | 1 << ibf_order); |
680 | 686 | ||
681 | { | 687 | { |
682 | char name[64] = { 0 }; | 688 | char name[64] = { 0 }; |
683 | snprintf(name, sizeof(name), "# sent IBF (order %u)", ibf_order); | 689 | snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); |
684 | GNUNET_STATISTICS_update(_GSS_statistics, name, 1, GNUNET_NO); | 690 | GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); |
685 | } | 691 | } |
686 | 692 | ||
687 | ibf = op->state->local_ibf; | 693 | ibf = op->state->local_ibf; |
688 | 694 | ||
689 | while (buckets_sent < (1 << ibf_order)) | 695 | while (buckets_sent < (1 << ibf_order)) |
690 | { | 696 | { |
691 | unsigned int buckets_in_message; | 697 | unsigned int buckets_in_message; |
692 | struct GNUNET_MQ_Envelope *ev; | 698 | struct GNUNET_MQ_Envelope *ev; |
693 | struct IBFMessage *msg; | 699 | struct IBFMessage *msg; |
694 | 700 | ||
695 | buckets_in_message = (1 << ibf_order) - buckets_sent; | 701 | buckets_in_message = (1 << ibf_order) - buckets_sent; |
696 | /* limit to maximum */ | 702 | /* limit to maximum */ |
697 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) | 703 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) |
698 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | 704 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; |
699 | 705 | ||
700 | ev = GNUNET_MQ_msg_extra(msg, | 706 | ev = GNUNET_MQ_msg_extra (msg, |
701 | buckets_in_message * IBF_BUCKET_SIZE, | 707 | buckets_in_message * IBF_BUCKET_SIZE, |
702 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); | 708 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); |
703 | msg->reserved1 = 0; | 709 | msg->reserved1 = 0; |
704 | msg->reserved2 = 0; | 710 | msg->reserved2 = 0; |
705 | msg->order = ibf_order; | 711 | msg->order = ibf_order; |
706 | msg->offset = htonl(buckets_sent); | 712 | msg->offset = htonl (buckets_sent); |
707 | msg->salt = htonl(op->state->salt_send); | 713 | msg->salt = htonl (op->state->salt_send); |
708 | ibf_write_slice(ibf, buckets_sent, | 714 | ibf_write_slice (ibf, buckets_sent, |
709 | buckets_in_message, &msg[1]); | 715 | buckets_in_message, &msg[1]); |
710 | buckets_sent += buckets_in_message; | 716 | buckets_sent += buckets_in_message; |
711 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 717 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
712 | "ibf chunk size %u, %u/%u sent\n", | 718 | "ibf chunk size %u, %u/%u sent\n", |
713 | buckets_in_message, | 719 | buckets_in_message, |
714 | buckets_sent, | 720 | buckets_sent, |
715 | 1 << ibf_order); | 721 | 1 << ibf_order); |
716 | GNUNET_MQ_send(op->mq, ev); | 722 | GNUNET_MQ_send (op->mq, ev); |
717 | } | 723 | } |
718 | 724 | ||
719 | /* The other peer must decode the IBF, so | 725 | /* The other peer must decode the IBF, so |
720 | * we're passive. */ | 726 | * we're passive. */ |
@@ -731,7 +737,7 @@ send_ibf(struct Operation *op, | |||
731 | * @return the required size of the ibf | 737 | * @return the required size of the ibf |
732 | */ | 738 | */ |
733 | static unsigned int | 739 | static unsigned int |
734 | get_order_from_difference(unsigned int diff) | 740 | get_order_from_difference (unsigned int diff) |
735 | { | 741 | { |
736 | unsigned int ibf_order; | 742 | unsigned int ibf_order; |
737 | 743 | ||
@@ -755,9 +761,9 @@ get_order_from_difference(unsigned int diff) | |||
755 | * @return #GNUNET_YES (to continue iterating) | 761 | * @return #GNUNET_YES (to continue iterating) |
756 | */ | 762 | */ |
757 | static int | 763 | static int |
758 | send_full_element_iterator(void *cls, | 764 | send_full_element_iterator (void *cls, |
759 | const struct GNUNET_HashCode *key, | 765 | const struct GNUNET_HashCode *key, |
760 | void *value) | 766 | void *value) |
761 | { | 767 | { |
762 | struct Operation *op = cls; | 768 | struct Operation *op = cls; |
763 | struct GNUNET_SET_ElementMessage *emsg; | 769 | struct GNUNET_SET_ElementMessage *emsg; |
@@ -765,18 +771,18 @@ send_full_element_iterator(void *cls, | |||
765 | struct GNUNET_SET_Element *el = &ee->element; | 771 | struct GNUNET_SET_Element *el = &ee->element; |
766 | struct GNUNET_MQ_Envelope *ev; | 772 | struct GNUNET_MQ_Envelope *ev; |
767 | 773 | ||
768 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 774 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
769 | "Sending element %s\n", | 775 | "Sending element %s\n", |
770 | GNUNET_h2s(key)); | 776 | GNUNET_h2s (key)); |
771 | ev = GNUNET_MQ_msg_extra(emsg, | 777 | ev = GNUNET_MQ_msg_extra (emsg, |
772 | el->size, | 778 | el->size, |
773 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | 779 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); |
774 | emsg->element_type = htons(el->element_type); | 780 | emsg->element_type = htons (el->element_type); |
775 | GNUNET_memcpy(&emsg[1], | 781 | GNUNET_memcpy (&emsg[1], |
776 | el->data, | 782 | el->data, |
777 | el->size); | 783 | el->size); |
778 | GNUNET_MQ_send(op->mq, | 784 | GNUNET_MQ_send (op->mq, |
779 | ev); | 785 | ev); |
780 | return GNUNET_YES; | 786 | return GNUNET_YES; |
781 | } | 787 | } |
782 | 788 | ||
@@ -787,21 +793,21 @@ send_full_element_iterator(void *cls, | |||
787 | * @param op operation to switch to full set transmission. | 793 | * @param op operation to switch to full set transmission. |
788 | */ | 794 | */ |
789 | static void | 795 | static void |
790 | send_full_set(struct Operation *op) | 796 | send_full_set (struct Operation *op) |
791 | { | 797 | { |
792 | struct GNUNET_MQ_Envelope *ev; | 798 | struct GNUNET_MQ_Envelope *ev; |
793 | 799 | ||
794 | op->state->phase = PHASE_FULL_SENDING; | 800 | op->state->phase = PHASE_FULL_SENDING; |
795 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 801 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
796 | "Dedicing to transmit the full set\n"); | 802 | "Dedicing to transmit the full set\n"); |
797 | /* FIXME: use a more memory-friendly way of doing this with an | 803 | /* FIXME: use a more memory-friendly way of doing this with an |
798 | iterator, just as we do in the non-full case! */ | 804 | iterator, just as we do in the non-full case! */ |
799 | (void)GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements, | 805 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, |
800 | &send_full_element_iterator, | 806 | &send_full_element_iterator, |
801 | op); | 807 | op); |
802 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | 808 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); |
803 | GNUNET_MQ_send(op->mq, | 809 | GNUNET_MQ_send (op->mq, |
804 | ev); | 810 | ev); |
805 | } | 811 | } |
806 | 812 | ||
807 | 813 | ||
@@ -812,26 +818,27 @@ send_full_set(struct Operation *op) | |||
812 | * @param msg the message | 818 | * @param msg the message |
813 | */ | 819 | */ |
814 | int | 820 | int |
815 | check_union_p2p_strata_estimator(void *cls, | 821 | check_union_p2p_strata_estimator (void *cls, |
816 | const struct StrataEstimatorMessage *msg) | 822 | const struct StrataEstimatorMessage *msg) |
817 | { | 823 | { |
818 | struct Operation *op = cls; | 824 | struct Operation *op = cls; |
819 | int is_compressed; | 825 | int is_compressed; |
820 | size_t len; | 826 | size_t len; |
821 | 827 | ||
822 | if (op->state->phase != PHASE_EXPECT_SE) | 828 | if (op->state->phase != PHASE_EXPECT_SE) |
823 | { | 829 | { |
824 | GNUNET_break(0); | 830 | GNUNET_break (0); |
825 | return GNUNET_SYSERR; | 831 | return GNUNET_SYSERR; |
826 | } | 832 | } |
827 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type)); | 833 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons ( |
828 | len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage); | 834 | msg->header.type)); |
835 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); | ||
829 | if ((GNUNET_NO == is_compressed) && | 836 | if ((GNUNET_NO == is_compressed) && |
830 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) | 837 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) |
831 | { | 838 | { |
832 | GNUNET_break(0); | 839 | GNUNET_break (0); |
833 | return GNUNET_SYSERR; | 840 | return GNUNET_SYSERR; |
834 | } | 841 | } |
835 | return GNUNET_OK; | 842 | return GNUNET_OK; |
836 | } | 843 | } |
837 | 844 | ||
@@ -843,8 +850,8 @@ check_union_p2p_strata_estimator(void *cls, | |||
843 | * @param msg the message | 850 | * @param msg the message |
844 | */ | 851 | */ |
845 | void | 852 | void |
846 | handle_union_p2p_strata_estimator(void *cls, | 853 | handle_union_p2p_strata_estimator (void *cls, |
847 | const struct StrataEstimatorMessage *msg) | 854 | const struct StrataEstimatorMessage *msg) |
848 | { | 855 | { |
849 | struct Operation *op = cls; | 856 | struct Operation *op = cls; |
850 | struct StrataEstimator *remote_se; | 857 | struct StrataEstimator *remote_se; |
@@ -853,116 +860,118 @@ handle_union_p2p_strata_estimator(void *cls, | |||
853 | size_t len; | 860 | size_t len; |
854 | int is_compressed; | 861 | int is_compressed; |
855 | 862 | ||
856 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type)); | 863 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons ( |
857 | GNUNET_STATISTICS_update(_GSS_statistics, | 864 | msg->header.type)); |
858 | "# bytes of SE received", | 865 | GNUNET_STATISTICS_update (_GSS_statistics, |
859 | ntohs(msg->header.size), | 866 | "# bytes of SE received", |
860 | GNUNET_NO); | 867 | ntohs (msg->header.size), |
861 | len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage); | 868 | GNUNET_NO); |
862 | other_size = GNUNET_ntohll(msg->set_size); | 869 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); |
863 | remote_se = strata_estimator_create(SE_STRATA_COUNT, | 870 | other_size = GNUNET_ntohll (msg->set_size); |
864 | SE_IBF_SIZE, | 871 | remote_se = strata_estimator_create (SE_STRATA_COUNT, |
865 | SE_IBF_HASH_NUM); | 872 | SE_IBF_SIZE, |
873 | SE_IBF_HASH_NUM); | ||
866 | if (NULL == remote_se) | 874 | if (NULL == remote_se) |
867 | { | 875 | { |
868 | /* insufficient resources, fail */ | 876 | /* insufficient resources, fail */ |
869 | fail_union_operation(op); | 877 | fail_union_operation (op); |
870 | return; | 878 | return; |
871 | } | 879 | } |
872 | if (GNUNET_OK != | 880 | if (GNUNET_OK != |
873 | strata_estimator_read(&msg[1], | 881 | strata_estimator_read (&msg[1], |
874 | len, | 882 | len, |
875 | is_compressed, | 883 | is_compressed, |
876 | remote_se)) | 884 | remote_se)) |
877 | { | 885 | { |
878 | /* decompression failed */ | 886 | /* decompression failed */ |
879 | strata_estimator_destroy(remote_se); | 887 | strata_estimator_destroy (remote_se); |
880 | fail_union_operation(op); | 888 | fail_union_operation (op); |
881 | return; | 889 | return; |
882 | } | 890 | } |
883 | GNUNET_assert(NULL != op->state->se); | 891 | GNUNET_assert (NULL != op->state->se); |
884 | diff = strata_estimator_difference(remote_se, | 892 | diff = strata_estimator_difference (remote_se, |
885 | op->state->se); | 893 | op->state->se); |
886 | 894 | ||
887 | if (diff > 200) | 895 | if (diff > 200) |
888 | diff = diff * 3 / 2; | 896 | diff = diff * 3 / 2; |
889 | 897 | ||
890 | strata_estimator_destroy(remote_se); | 898 | strata_estimator_destroy (remote_se); |
891 | strata_estimator_destroy(op->state->se); | 899 | strata_estimator_destroy (op->state->se); |
892 | op->state->se = NULL; | 900 | op->state->se = NULL; |
893 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 901 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
894 | "got se diff=%d, using ibf size %d\n", | 902 | "got se diff=%d, using ibf size %d\n", |
895 | diff, | 903 | diff, |
896 | 1U << get_order_from_difference(diff)); | 904 | 1U << get_order_from_difference (diff)); |
897 | 905 | ||
898 | { | 906 | { |
899 | char *set_debug; | 907 | char *set_debug; |
900 | 908 | ||
901 | set_debug = getenv("GNUNET_SET_BENCHMARK"); | 909 | set_debug = getenv ("GNUNET_SET_BENCHMARK"); |
902 | if ((NULL != set_debug) && | 910 | if ((NULL != set_debug) && |
903 | (0 == strcmp(set_debug, "1"))) | 911 | (0 == strcmp (set_debug, "1"))) |
904 | { | 912 | { |
905 | FILE *f = fopen("set.log", "a"); | 913 | FILE *f = fopen ("set.log", "a"); |
906 | fprintf(f, "%llu\n", (unsigned long long)diff); | 914 | fprintf (f, "%llu\n", (unsigned long long) diff); |
907 | fclose(f); | 915 | fclose (f); |
908 | } | 916 | } |
909 | } | 917 | } |
910 | 918 | ||
911 | if ((GNUNET_YES == op->byzantine) && | 919 | if ((GNUNET_YES == op->byzantine) && |
912 | (other_size < op->byzantine_lower_bound)) | 920 | (other_size < op->byzantine_lower_bound)) |
913 | { | 921 | { |
914 | GNUNET_break(0); | 922 | GNUNET_break (0); |
915 | fail_union_operation(op); | 923 | fail_union_operation (op); |
916 | return; | 924 | return; |
917 | } | 925 | } |
918 | 926 | ||
919 | if ((GNUNET_YES == op->force_full) || | 927 | if ((GNUNET_YES == op->force_full) || |
920 | (diff > op->state->initial_size / 4) || | 928 | (diff > op->state->initial_size / 4) || |
921 | (0 == other_size)) | 929 | (0 == other_size)) |
930 | { | ||
931 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
932 | "Deciding to go for full set transmission (diff=%d, own set=%u)\n", | ||
933 | diff, | ||
934 | op->state->initial_size); | ||
935 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
936 | "# of full sends", | ||
937 | 1, | ||
938 | GNUNET_NO); | ||
939 | if ((op->state->initial_size <= other_size) || | ||
940 | (0 == other_size)) | ||
922 | { | 941 | { |
923 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 942 | send_full_set (op); |
924 | "Deciding to go for full set transmission (diff=%d, own set=%u)\n", | ||
925 | diff, | ||
926 | op->state->initial_size); | ||
927 | GNUNET_STATISTICS_update(_GSS_statistics, | ||
928 | "# of full sends", | ||
929 | 1, | ||
930 | GNUNET_NO); | ||
931 | if ((op->state->initial_size <= other_size) || | ||
932 | (0 == other_size)) | ||
933 | { | ||
934 | send_full_set(op); | ||
935 | } | ||
936 | else | ||
937 | { | ||
938 | struct GNUNET_MQ_Envelope *ev; | ||
939 | |||
940 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
941 | "Telling other peer that we expect its full set\n"); | ||
942 | op->state->phase = PHASE_EXPECT_IBF; | ||
943 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); | ||
944 | GNUNET_MQ_send(op->mq, | ||
945 | ev); | ||
946 | } | ||
947 | } | 943 | } |
948 | else | 944 | else |
949 | { | 945 | { |
950 | GNUNET_STATISTICS_update(_GSS_statistics, | 946 | struct GNUNET_MQ_Envelope *ev; |
951 | "# of ibf sends", | 947 | |
952 | 1, | 948 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
953 | GNUNET_NO); | 949 | "Telling other peer that we expect its full set\n"); |
954 | if (GNUNET_OK != | 950 | op->state->phase = PHASE_EXPECT_IBF; |
955 | send_ibf(op, | 951 | ev = GNUNET_MQ_msg_header ( |
956 | get_order_from_difference(diff))) | 952 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); |
957 | { | 953 | GNUNET_MQ_send (op->mq, |
958 | /* Internal error, best we can do is shut the connection */ | 954 | ev); |
959 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | ||
960 | "Failed to send IBF, closing connection\n"); | ||
961 | fail_union_operation(op); | ||
962 | return; | ||
963 | } | ||
964 | } | 955 | } |
965 | GNUNET_CADET_receive_done(op->channel); | 956 | } |
957 | else | ||
958 | { | ||
959 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
960 | "# of ibf sends", | ||
961 | 1, | ||
962 | GNUNET_NO); | ||
963 | if (GNUNET_OK != | ||
964 | send_ibf (op, | ||
965 | get_order_from_difference (diff))) | ||
966 | { | ||
967 | /* Internal error, best we can do is shut the connection */ | ||
968 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
969 | "Failed to send IBF, closing connection\n"); | ||
970 | fail_union_operation (op); | ||
971 | return; | ||
972 | } | ||
973 | } | ||
974 | GNUNET_CADET_receive_done (op->channel); | ||
966 | } | 975 | } |
967 | 976 | ||
968 | 977 | ||
@@ -974,9 +983,9 @@ handle_union_p2p_strata_estimator(void *cls, | |||
974 | * @param value the key entry | 983 | * @param value the key entry |
975 | */ | 984 | */ |
976 | static int | 985 | static int |
977 | send_offers_iterator(void *cls, | 986 | send_offers_iterator (void *cls, |
978 | uint32_t key, | 987 | uint32_t key, |
979 | void *value) | 988 | void *value) |
980 | { | 989 | { |
981 | struct SendElementClosure *sec = cls; | 990 | struct SendElementClosure *sec = cls; |
982 | struct Operation *op = sec->op; | 991 | struct Operation *op = sec->op; |
@@ -988,17 +997,17 @@ send_offers_iterator(void *cls, | |||
988 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) | 997 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) |
989 | return GNUNET_YES; | 998 | return GNUNET_YES; |
990 | 999 | ||
991 | ev = GNUNET_MQ_msg_header_extra(mh, | 1000 | ev = GNUNET_MQ_msg_header_extra (mh, |
992 | sizeof(struct GNUNET_HashCode), | 1001 | sizeof(struct GNUNET_HashCode), |
993 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); | 1002 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); |
994 | 1003 | ||
995 | GNUNET_assert(NULL != ev); | 1004 | GNUNET_assert (NULL != ev); |
996 | *(struct GNUNET_HashCode *)&mh[1] = ke->element->element_hash; | 1005 | *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; |
997 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1006 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
998 | "[OP %x] sending element offer (%s) to peer\n", | 1007 | "[OP %x] sending element offer (%s) to peer\n", |
999 | (void *)op, | 1008 | (void *) op, |
1000 | GNUNET_h2s(&ke->element->element_hash)); | 1009 | GNUNET_h2s (&ke->element->element_hash)); |
1001 | GNUNET_MQ_send(op->mq, ev); | 1010 | GNUNET_MQ_send (op->mq, ev); |
1002 | return GNUNET_YES; | 1011 | return GNUNET_YES; |
1003 | } | 1012 | } |
1004 | 1013 | ||
@@ -1010,17 +1019,19 @@ send_offers_iterator(void *cls, | |||
1010 | * @param ibf_key IBF key of interest | 1019 | * @param ibf_key IBF key of interest |
1011 | */ | 1020 | */ |
1012 | static void | 1021 | static void |
1013 | send_offers_for_key(struct Operation *op, | 1022 | send_offers_for_key (struct Operation *op, |
1014 | struct IBF_Key ibf_key) | 1023 | struct IBF_Key ibf_key) |
1015 | { | 1024 | { |
1016 | struct SendElementClosure send_cls; | 1025 | struct SendElementClosure send_cls; |
1017 | 1026 | ||
1018 | send_cls.ibf_key = ibf_key; | 1027 | send_cls.ibf_key = ibf_key; |
1019 | send_cls.op = op; | 1028 | send_cls.op = op; |
1020 | (void)GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element, | 1029 | (void) GNUNET_CONTAINER_multihashmap32_get_multiple ( |
1021 | (uint32_t)ibf_key.key_val, | 1030 | op->state->key_to_element, |
1022 | &send_offers_iterator, | 1031 | (uint32_t) ibf_key. |
1023 | &send_cls); | 1032 | key_val, |
1033 | &send_offers_iterator, | ||
1034 | &send_cls); | ||
1024 | } | 1035 | } |
1025 | 1036 | ||
1026 | 1037 | ||
@@ -1032,7 +1043,7 @@ send_offers_for_key(struct Operation *op, | |||
1032 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure | 1043 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure |
1033 | */ | 1044 | */ |
1034 | static int | 1045 | static int |
1035 | decode_and_send(struct Operation *op) | 1046 | decode_and_send (struct Operation *op) |
1036 | { | 1047 | { |
1037 | struct IBF_Key key; | 1048 | struct IBF_Key key; |
1038 | struct IBF_Key last_key; | 1049 | struct IBF_Key last_key; |
@@ -1040,147 +1051,147 @@ decode_and_send(struct Operation *op) | |||
1040 | unsigned int num_decoded; | 1051 | unsigned int num_decoded; |
1041 | struct InvertibleBloomFilter *diff_ibf; | 1052 | struct InvertibleBloomFilter *diff_ibf; |
1042 | 1053 | ||
1043 | GNUNET_assert(PHASE_INVENTORY_ACTIVE == op->state->phase); | 1054 | GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); |
1044 | 1055 | ||
1045 | if (GNUNET_OK != | 1056 | if (GNUNET_OK != |
1046 | prepare_ibf(op, | 1057 | prepare_ibf (op, |
1047 | op->state->remote_ibf->size)) | 1058 | op->state->remote_ibf->size)) |
1048 | { | 1059 | { |
1049 | GNUNET_break(0); | 1060 | GNUNET_break (0); |
1050 | /* allocation failed */ | 1061 | /* allocation failed */ |
1051 | return GNUNET_SYSERR; | 1062 | return GNUNET_SYSERR; |
1052 | } | 1063 | } |
1053 | diff_ibf = ibf_dup(op->state->local_ibf); | 1064 | diff_ibf = ibf_dup (op->state->local_ibf); |
1054 | ibf_subtract(diff_ibf, | 1065 | ibf_subtract (diff_ibf, |
1055 | op->state->remote_ibf); | 1066 | op->state->remote_ibf); |
1056 | 1067 | ||
1057 | ibf_destroy(op->state->remote_ibf); | 1068 | ibf_destroy (op->state->remote_ibf); |
1058 | op->state->remote_ibf = NULL; | 1069 | op->state->remote_ibf = NULL; |
1059 | 1070 | ||
1060 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1071 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1061 | "decoding IBF (size=%u)\n", | 1072 | "decoding IBF (size=%u)\n", |
1062 | diff_ibf->size); | 1073 | diff_ibf->size); |
1063 | 1074 | ||
1064 | num_decoded = 0; | 1075 | num_decoded = 0; |
1065 | key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ | 1076 | key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ |
1066 | 1077 | ||
1067 | while (1) | 1078 | while (1) |
1068 | { | 1079 | { |
1069 | int res; | 1080 | int res; |
1070 | int cycle_detected = GNUNET_NO; | 1081 | int cycle_detected = GNUNET_NO; |
1071 | |||
1072 | last_key = key; | ||
1073 | 1082 | ||
1074 | res = ibf_decode(diff_ibf, &side, &key); | 1083 | last_key = key; |
1075 | if (res == GNUNET_OK) | ||
1076 | { | ||
1077 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1078 | "decoded ibf key %lx\n", | ||
1079 | (unsigned long)key.key_val); | ||
1080 | num_decoded += 1; | ||
1081 | if ((num_decoded > diff_ibf->size) || | ||
1082 | ((num_decoded > 1) && | ||
1083 | (last_key.key_val == key.key_val))) | ||
1084 | { | ||
1085 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1086 | "detected cyclic ibf (decoded %u/%u)\n", | ||
1087 | num_decoded, | ||
1088 | diff_ibf->size); | ||
1089 | cycle_detected = GNUNET_YES; | ||
1090 | } | ||
1091 | } | ||
1092 | if ((GNUNET_SYSERR == res) || | ||
1093 | (GNUNET_YES == cycle_detected)) | ||
1094 | { | ||
1095 | int next_order; | ||
1096 | next_order = 0; | ||
1097 | while (1 << next_order < diff_ibf->size) | ||
1098 | next_order++; | ||
1099 | next_order++; | ||
1100 | if (next_order <= MAX_IBF_ORDER) | ||
1101 | { | ||
1102 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1103 | "decoding failed, sending larger ibf (size %u)\n", | ||
1104 | 1 << next_order); | ||
1105 | GNUNET_STATISTICS_update(_GSS_statistics, | ||
1106 | "# of IBF retries", | ||
1107 | 1, | ||
1108 | GNUNET_NO); | ||
1109 | op->state->salt_send++; | ||
1110 | if (GNUNET_OK != | ||
1111 | send_ibf(op, next_order)) | ||
1112 | { | ||
1113 | /* Internal error, best we can do is shut the connection */ | ||
1114 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | ||
1115 | "Failed to send IBF, closing connection\n"); | ||
1116 | fail_union_operation(op); | ||
1117 | ibf_destroy(diff_ibf); | ||
1118 | return GNUNET_SYSERR; | ||
1119 | } | ||
1120 | } | ||
1121 | else | ||
1122 | { | ||
1123 | GNUNET_STATISTICS_update(_GSS_statistics, | ||
1124 | "# of failed union operations (too large)", | ||
1125 | 1, | ||
1126 | GNUNET_NO); | ||
1127 | // XXX: Send the whole set, element-by-element | ||
1128 | LOG(GNUNET_ERROR_TYPE_ERROR, | ||
1129 | "set union failed: reached ibf limit\n"); | ||
1130 | fail_union_operation(op); | ||
1131 | ibf_destroy(diff_ibf); | ||
1132 | return GNUNET_SYSERR; | ||
1133 | } | ||
1134 | break; | ||
1135 | } | ||
1136 | if (GNUNET_NO == res) | ||
1137 | { | ||
1138 | struct GNUNET_MQ_Envelope *ev; | ||
1139 | |||
1140 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1141 | "transmitted all values, sending DONE\n"); | ||
1142 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | ||
1143 | GNUNET_MQ_send(op->mq, ev); | ||
1144 | /* We now wait until we get a DONE message back | ||
1145 | * and then wait for our MQ to be flushed and all our | ||
1146 | * demands be delivered. */ | ||
1147 | break; | ||
1148 | } | ||
1149 | if (1 == side) | ||
1150 | { | ||
1151 | struct IBF_Key unsalted_key; | ||
1152 | 1084 | ||
1153 | unsalt_key(&key, | 1085 | res = ibf_decode (diff_ibf, &side, &key); |
1154 | op->state->salt_receive, | 1086 | if (res == GNUNET_OK) |
1155 | &unsalted_key); | 1087 | { |
1156 | send_offers_for_key(op, | 1088 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1157 | unsalted_key); | 1089 | "decoded ibf key %lx\n", |
1158 | } | 1090 | (unsigned long) key.key_val); |
1159 | else if (-1 == side) | 1091 | num_decoded += 1; |
1092 | if ((num_decoded > diff_ibf->size) || | ||
1093 | ((num_decoded > 1) && | ||
1094 | (last_key.key_val == key.key_val))) | ||
1095 | { | ||
1096 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1097 | "detected cyclic ibf (decoded %u/%u)\n", | ||
1098 | num_decoded, | ||
1099 | diff_ibf->size); | ||
1100 | cycle_detected = GNUNET_YES; | ||
1101 | } | ||
1102 | } | ||
1103 | if ((GNUNET_SYSERR == res) || | ||
1104 | (GNUNET_YES == cycle_detected)) | ||
1105 | { | ||
1106 | int next_order; | ||
1107 | next_order = 0; | ||
1108 | while (1 << next_order < diff_ibf->size) | ||
1109 | next_order++; | ||
1110 | next_order++; | ||
1111 | if (next_order <= MAX_IBF_ORDER) | ||
1112 | { | ||
1113 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1114 | "decoding failed, sending larger ibf (size %u)\n", | ||
1115 | 1 << next_order); | ||
1116 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1117 | "# of IBF retries", | ||
1118 | 1, | ||
1119 | GNUNET_NO); | ||
1120 | op->state->salt_send++; | ||
1121 | if (GNUNET_OK != | ||
1122 | send_ibf (op, next_order)) | ||
1160 | { | 1123 | { |
1161 | struct GNUNET_MQ_Envelope *ev; | 1124 | /* Internal error, best we can do is shut the connection */ |
1162 | struct InquiryMessage *msg; | 1125 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1163 | 1126 | "Failed to send IBF, closing connection\n"); | |
1164 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth | 1127 | fail_union_operation (op); |
1165 | * the effort additional complexity. */ | 1128 | ibf_destroy (diff_ibf); |
1166 | ev = GNUNET_MQ_msg_extra(msg, | 1129 | return GNUNET_SYSERR; |
1167 | sizeof(struct IBF_Key), | ||
1168 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); | ||
1169 | msg->salt = htonl(op->state->salt_receive); | ||
1170 | GNUNET_memcpy(&msg[1], | ||
1171 | &key, | ||
1172 | sizeof(struct IBF_Key)); | ||
1173 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1174 | "sending element inquiry for IBF key %lx\n", | ||
1175 | (unsigned long)key.key_val); | ||
1176 | GNUNET_MQ_send(op->mq, ev); | ||
1177 | } | 1130 | } |
1131 | } | ||
1178 | else | 1132 | else |
1179 | { | 1133 | { |
1180 | GNUNET_assert(0); | 1134 | GNUNET_STATISTICS_update (_GSS_statistics, |
1181 | } | 1135 | "# of failed union operations (too large)", |
1136 | 1, | ||
1137 | GNUNET_NO); | ||
1138 | // XXX: Send the whole set, element-by-element | ||
1139 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1140 | "set union failed: reached ibf limit\n"); | ||
1141 | fail_union_operation (op); | ||
1142 | ibf_destroy (diff_ibf); | ||
1143 | return GNUNET_SYSERR; | ||
1144 | } | ||
1145 | break; | ||
1146 | } | ||
1147 | if (GNUNET_NO == res) | ||
1148 | { | ||
1149 | struct GNUNET_MQ_Envelope *ev; | ||
1150 | |||
1151 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1152 | "transmitted all values, sending DONE\n"); | ||
1153 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | ||
1154 | GNUNET_MQ_send (op->mq, ev); | ||
1155 | /* We now wait until we get a DONE message back | ||
1156 | * and then wait for our MQ to be flushed and all our | ||
1157 | * demands be delivered. */ | ||
1158 | break; | ||
1182 | } | 1159 | } |
1183 | ibf_destroy(diff_ibf); | 1160 | if (1 == side) |
1161 | { | ||
1162 | struct IBF_Key unsalted_key; | ||
1163 | |||
1164 | unsalt_key (&key, | ||
1165 | op->state->salt_receive, | ||
1166 | &unsalted_key); | ||
1167 | send_offers_for_key (op, | ||
1168 | unsalted_key); | ||
1169 | } | ||
1170 | else if (-1 == side) | ||
1171 | { | ||
1172 | struct GNUNET_MQ_Envelope *ev; | ||
1173 | struct InquiryMessage *msg; | ||
1174 | |||
1175 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth | ||
1176 | * the effort additional complexity. */ | ||
1177 | ev = GNUNET_MQ_msg_extra (msg, | ||
1178 | sizeof(struct IBF_Key), | ||
1179 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); | ||
1180 | msg->salt = htonl (op->state->salt_receive); | ||
1181 | GNUNET_memcpy (&msg[1], | ||
1182 | &key, | ||
1183 | sizeof(struct IBF_Key)); | ||
1184 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1185 | "sending element inquiry for IBF key %lx\n", | ||
1186 | (unsigned long) key.key_val); | ||
1187 | GNUNET_MQ_send (op->mq, ev); | ||
1188 | } | ||
1189 | else | ||
1190 | { | ||
1191 | GNUNET_assert (0); | ||
1192 | } | ||
1193 | } | ||
1194 | ibf_destroy (diff_ibf); | ||
1184 | return GNUNET_OK; | 1195 | return GNUNET_OK; |
1185 | } | 1196 | } |
1186 | 1197 | ||
@@ -1196,52 +1207,54 @@ decode_and_send(struct Operation *op) | |||
1196 | * @return #GNUNET_OK if @a msg is well-formed | 1207 | * @return #GNUNET_OK if @a msg is well-formed |
1197 | */ | 1208 | */ |
1198 | int | 1209 | int |
1199 | check_union_p2p_ibf(void *cls, | 1210 | check_union_p2p_ibf (void *cls, |
1200 | const struct IBFMessage *msg) | 1211 | const struct IBFMessage *msg) |
1201 | { | 1212 | { |
1202 | struct Operation *op = cls; | 1213 | struct Operation *op = cls; |
1203 | unsigned int buckets_in_message; | 1214 | unsigned int buckets_in_message; |
1204 | 1215 | ||
1205 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1216 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1206 | { | 1217 | { |
1207 | GNUNET_break_op(0); | 1218 | GNUNET_break_op (0); |
1208 | return GNUNET_SYSERR; | 1219 | return GNUNET_SYSERR; |
1209 | } | 1220 | } |
1210 | buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | 1221 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) |
1222 | / IBF_BUCKET_SIZE; | ||
1211 | if (0 == buckets_in_message) | 1223 | if (0 == buckets_in_message) |
1224 | { | ||
1225 | GNUNET_break_op (0); | ||
1226 | return GNUNET_SYSERR; | ||
1227 | } | ||
1228 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message | ||
1229 | * IBF_BUCKET_SIZE) | ||
1230 | { | ||
1231 | GNUNET_break_op (0); | ||
1232 | return GNUNET_SYSERR; | ||
1233 | } | ||
1234 | if (op->state->phase == PHASE_EXPECT_IBF_CONT) | ||
1235 | { | ||
1236 | if (ntohl (msg->offset) != op->state->ibf_buckets_received) | ||
1212 | { | 1237 | { |
1213 | GNUNET_break_op(0); | 1238 | GNUNET_break_op (0); |
1214 | return GNUNET_SYSERR; | 1239 | return GNUNET_SYSERR; |
1215 | } | 1240 | } |
1216 | if ((ntohs(msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) | 1241 | if (1 << msg->order != op->state->remote_ibf->size) |
1217 | { | 1242 | { |
1218 | GNUNET_break_op(0); | 1243 | GNUNET_break_op (0); |
1219 | return GNUNET_SYSERR; | 1244 | return GNUNET_SYSERR; |
1220 | } | 1245 | } |
1221 | if (op->state->phase == PHASE_EXPECT_IBF_CONT) | 1246 | if (ntohl (msg->salt) != op->state->salt_receive) |
1222 | { | 1247 | { |
1223 | if (ntohl(msg->offset) != op->state->ibf_buckets_received) | 1248 | GNUNET_break_op (0); |
1224 | { | 1249 | return GNUNET_SYSERR; |
1225 | GNUNET_break_op(0); | ||
1226 | return GNUNET_SYSERR; | ||
1227 | } | ||
1228 | if (1 << msg->order != op->state->remote_ibf->size) | ||
1229 | { | ||
1230 | GNUNET_break_op(0); | ||
1231 | return GNUNET_SYSERR; | ||
1232 | } | ||
1233 | if (ntohl(msg->salt) != op->state->salt_receive) | ||
1234 | { | ||
1235 | GNUNET_break_op(0); | ||
1236 | return GNUNET_SYSERR; | ||
1237 | } | ||
1238 | } | 1250 | } |
1251 | } | ||
1239 | else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && | 1252 | else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && |
1240 | (op->state->phase != PHASE_EXPECT_IBF)) | 1253 | (op->state->phase != PHASE_EXPECT_IBF)) |
1241 | { | 1254 | { |
1242 | GNUNET_break_op(0); | 1255 | GNUNET_break_op (0); |
1243 | return GNUNET_SYSERR; | 1256 | return GNUNET_SYSERR; |
1244 | } | 1257 | } |
1245 | 1258 | ||
1246 | return GNUNET_OK; | 1259 | return GNUNET_OK; |
1247 | } | 1260 | } |
@@ -1257,71 +1270,72 @@ check_union_p2p_ibf(void *cls, | |||
1257 | * @param msg the header of the message | 1270 | * @param msg the header of the message |
1258 | */ | 1271 | */ |
1259 | void | 1272 | void |
1260 | handle_union_p2p_ibf(void *cls, | 1273 | handle_union_p2p_ibf (void *cls, |
1261 | const struct IBFMessage *msg) | 1274 | const struct IBFMessage *msg) |
1262 | { | 1275 | { |
1263 | struct Operation *op = cls; | 1276 | struct Operation *op = cls; |
1264 | unsigned int buckets_in_message; | 1277 | unsigned int buckets_in_message; |
1265 | 1278 | ||
1266 | buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | 1279 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) |
1280 | / IBF_BUCKET_SIZE; | ||
1267 | if ((op->state->phase == PHASE_INVENTORY_PASSIVE) || | 1281 | if ((op->state->phase == PHASE_INVENTORY_PASSIVE) || |
1268 | (op->state->phase == PHASE_EXPECT_IBF)) | 1282 | (op->state->phase == PHASE_EXPECT_IBF)) |
1269 | { | 1283 | { |
1270 | op->state->phase = PHASE_EXPECT_IBF_CONT; | 1284 | op->state->phase = PHASE_EXPECT_IBF_CONT; |
1271 | GNUNET_assert(NULL == op->state->remote_ibf); | 1285 | GNUNET_assert (NULL == op->state->remote_ibf); |
1272 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1286 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1273 | "Creating new ibf of size %u\n", | 1287 | "Creating new ibf of size %u\n", |
1274 | 1 << msg->order); | 1288 | 1 << msg->order); |
1275 | op->state->remote_ibf = ibf_create(1 << msg->order, SE_IBF_HASH_NUM); | 1289 | op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); |
1276 | op->state->salt_receive = ntohl(msg->salt); | 1290 | op->state->salt_receive = ntohl (msg->salt); |
1277 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1291 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1278 | "Receiving new IBF with salt %u\n", | 1292 | "Receiving new IBF with salt %u\n", |
1279 | op->state->salt_receive); | 1293 | op->state->salt_receive); |
1280 | if (NULL == op->state->remote_ibf) | 1294 | if (NULL == op->state->remote_ibf) |
1281 | { | 1295 | { |
1282 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | 1296 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1283 | "Failed to parse remote IBF, closing connection\n"); | 1297 | "Failed to parse remote IBF, closing connection\n"); |
1284 | fail_union_operation(op); | 1298 | fail_union_operation (op); |
1285 | return; | 1299 | return; |
1286 | } | ||
1287 | op->state->ibf_buckets_received = 0; | ||
1288 | if (0 != ntohl(msg->offset)) | ||
1289 | { | ||
1290 | GNUNET_break_op(0); | ||
1291 | fail_union_operation(op); | ||
1292 | return; | ||
1293 | } | ||
1294 | } | 1300 | } |
1295 | else | 1301 | op->state->ibf_buckets_received = 0; |
1302 | if (0 != ntohl (msg->offset)) | ||
1296 | { | 1303 | { |
1297 | GNUNET_assert(op->state->phase == PHASE_EXPECT_IBF_CONT); | 1304 | GNUNET_break_op (0); |
1298 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1305 | fail_union_operation (op); |
1299 | "Received more of IBF\n"); | 1306 | return; |
1300 | } | 1307 | } |
1301 | GNUNET_assert(NULL != op->state->remote_ibf); | 1308 | } |
1309 | else | ||
1310 | { | ||
1311 | GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); | ||
1312 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1313 | "Received more of IBF\n"); | ||
1314 | } | ||
1315 | GNUNET_assert (NULL != op->state->remote_ibf); | ||
1302 | 1316 | ||
1303 | ibf_read_slice(&msg[1], | 1317 | ibf_read_slice (&msg[1], |
1304 | op->state->ibf_buckets_received, | 1318 | op->state->ibf_buckets_received, |
1305 | buckets_in_message, | 1319 | buckets_in_message, |
1306 | op->state->remote_ibf); | 1320 | op->state->remote_ibf); |
1307 | op->state->ibf_buckets_received += buckets_in_message; | 1321 | op->state->ibf_buckets_received += buckets_in_message; |
1308 | 1322 | ||
1309 | if (op->state->ibf_buckets_received == op->state->remote_ibf->size) | 1323 | if (op->state->ibf_buckets_received == op->state->remote_ibf->size) |
1310 | { | 1324 | { |
1311 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1325 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1312 | "received full ibf\n"); | 1326 | "received full ibf\n"); |
1313 | op->state->phase = PHASE_INVENTORY_ACTIVE; | 1327 | op->state->phase = PHASE_INVENTORY_ACTIVE; |
1314 | if (GNUNET_OK != | 1328 | if (GNUNET_OK != |
1315 | decode_and_send(op)) | 1329 | decode_and_send (op)) |
1316 | { | 1330 | { |
1317 | /* Internal error, best we can do is shut down */ | 1331 | /* Internal error, best we can do is shut down */ |
1318 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | 1332 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1319 | "Failed to decode IBF, closing connection\n"); | 1333 | "Failed to decode IBF, closing connection\n"); |
1320 | fail_union_operation(op); | 1334 | fail_union_operation (op); |
1321 | return; | 1335 | return; |
1322 | } | ||
1323 | } | 1336 | } |
1324 | GNUNET_CADET_receive_done(op->channel); | 1337 | } |
1338 | GNUNET_CADET_receive_done (op->channel); | ||
1325 | } | 1339 | } |
1326 | 1340 | ||
1327 | 1341 | ||
@@ -1334,33 +1348,34 @@ handle_union_p2p_ibf(void *cls, | |||
1334 | * @param status status to send with the new element | 1348 | * @param status status to send with the new element |
1335 | */ | 1349 | */ |
1336 | static void | 1350 | static void |
1337 | send_client_element(struct Operation *op, | 1351 | send_client_element (struct Operation *op, |
1338 | struct GNUNET_SET_Element *element, | 1352 | struct GNUNET_SET_Element *element, |
1339 | int status) | 1353 | int status) |
1340 | { | 1354 | { |
1341 | struct GNUNET_MQ_Envelope *ev; | 1355 | struct GNUNET_MQ_Envelope *ev; |
1342 | struct GNUNET_SET_ResultMessage *rm; | 1356 | struct GNUNET_SET_ResultMessage *rm; |
1343 | 1357 | ||
1344 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1358 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1345 | "sending element (size %u) to client\n", | 1359 | "sending element (size %u) to client\n", |
1346 | element->size); | 1360 | element->size); |
1347 | GNUNET_assert(0 != op->client_request_id); | 1361 | GNUNET_assert (0 != op->client_request_id); |
1348 | ev = GNUNET_MQ_msg_extra(rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | 1362 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
1349 | if (NULL == ev) | 1363 | if (NULL == ev) |
1350 | { | 1364 | { |
1351 | GNUNET_MQ_discard(ev); | 1365 | GNUNET_MQ_discard (ev); |
1352 | GNUNET_break(0); | 1366 | GNUNET_break (0); |
1353 | return; | 1367 | return; |
1354 | } | 1368 | } |
1355 | rm->result_status = htons(status); | 1369 | rm->result_status = htons (status); |
1356 | rm->request_id = htonl(op->client_request_id); | 1370 | rm->request_id = htonl (op->client_request_id); |
1357 | rm->element_type = htons(element->element_type); | 1371 | rm->element_type = htons (element->element_type); |
1358 | rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element)); | 1372 | rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size ( |
1359 | GNUNET_memcpy(&rm[1], | 1373 | op->state->key_to_element)); |
1360 | element->data, | 1374 | GNUNET_memcpy (&rm[1], |
1361 | element->size); | 1375 | element->data, |
1362 | GNUNET_MQ_send(op->set->cs->mq, | 1376 | element->size); |
1363 | ev); | 1377 | GNUNET_MQ_send (op->set->cs->mq, |
1378 | ev); | ||
1364 | } | 1379 | } |
1365 | 1380 | ||
1366 | 1381 | ||
@@ -1371,50 +1386,51 @@ send_client_element(struct Operation *op, | |||
1371 | * @param cls operation to destroy | 1386 | * @param cls operation to destroy |
1372 | */ | 1387 | */ |
1373 | static void | 1388 | static void |
1374 | send_client_done(void *cls) | 1389 | send_client_done (void *cls) |
1375 | { | 1390 | { |
1376 | struct Operation *op = cls; | 1391 | struct Operation *op = cls; |
1377 | struct GNUNET_MQ_Envelope *ev; | 1392 | struct GNUNET_MQ_Envelope *ev; |
1378 | struct GNUNET_SET_ResultMessage *rm; | 1393 | struct GNUNET_SET_ResultMessage *rm; |
1379 | 1394 | ||
1380 | if (GNUNET_YES == op->state->client_done_sent) | 1395 | if (GNUNET_YES == op->state->client_done_sent) |
1381 | { | 1396 | { |
1382 | return; | 1397 | return; |
1383 | } | 1398 | } |
1384 | 1399 | ||
1385 | if (PHASE_DONE != op->state->phase) | 1400 | if (PHASE_DONE != op->state->phase) |
1386 | { | 1401 | { |
1387 | LOG(GNUNET_ERROR_TYPE_WARNING, | 1402 | LOG (GNUNET_ERROR_TYPE_WARNING, |
1388 | "Union operation failed\n"); | 1403 | "Union operation failed\n"); |
1389 | GNUNET_STATISTICS_update(_GSS_statistics, | 1404 | GNUNET_STATISTICS_update (_GSS_statistics, |
1390 | "# Union operations failed", | 1405 | "# Union operations failed", |
1391 | 1, | 1406 | 1, |
1392 | GNUNET_NO); | 1407 | GNUNET_NO); |
1393 | ev = GNUNET_MQ_msg(rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 1408 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
1394 | rm->result_status = htons(GNUNET_SET_STATUS_FAILURE); | 1409 | rm->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
1395 | rm->request_id = htonl(op->client_request_id); | 1410 | rm->request_id = htonl (op->client_request_id); |
1396 | rm->element_type = htons(0); | 1411 | rm->element_type = htons (0); |
1397 | GNUNET_MQ_send(op->set->cs->mq, | 1412 | GNUNET_MQ_send (op->set->cs->mq, |
1398 | ev); | 1413 | ev); |
1399 | return; | 1414 | return; |
1400 | } | 1415 | } |
1401 | 1416 | ||
1402 | op->state->client_done_sent = GNUNET_YES; | 1417 | op->state->client_done_sent = GNUNET_YES; |
1403 | 1418 | ||
1404 | GNUNET_STATISTICS_update(_GSS_statistics, | 1419 | GNUNET_STATISTICS_update (_GSS_statistics, |
1405 | "# Union operations succeeded", | 1420 | "# Union operations succeeded", |
1406 | 1, | 1421 | 1, |
1407 | GNUNET_NO); | 1422 | GNUNET_NO); |
1408 | LOG(GNUNET_ERROR_TYPE_INFO, | 1423 | LOG (GNUNET_ERROR_TYPE_INFO, |
1409 | "Signalling client that union operation is done\n"); | 1424 | "Signalling client that union operation is done\n"); |
1410 | ev = GNUNET_MQ_msg(rm, | 1425 | ev = GNUNET_MQ_msg (rm, |
1411 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 1426 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
1412 | rm->request_id = htonl(op->client_request_id); | 1427 | rm->request_id = htonl (op->client_request_id); |
1413 | rm->result_status = htons(GNUNET_SET_STATUS_DONE); | 1428 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1414 | rm->element_type = htons(0); | 1429 | rm->element_type = htons (0); |
1415 | rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element)); | 1430 | rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size ( |
1416 | GNUNET_MQ_send(op->set->cs->mq, | 1431 | op->state->key_to_element)); |
1417 | ev); | 1432 | GNUNET_MQ_send (op->set->cs->mq, |
1433 | ev); | ||
1418 | } | 1434 | } |
1419 | 1435 | ||
1420 | 1436 | ||
@@ -1424,41 +1440,42 @@ send_client_done(void *cls) | |||
1424 | * @param op operation to check | 1440 | * @param op operation to check |
1425 | */ | 1441 | */ |
1426 | static void | 1442 | static void |
1427 | maybe_finish(struct Operation *op) | 1443 | maybe_finish (struct Operation *op) |
1428 | { | 1444 | { |
1429 | unsigned int num_demanded; | 1445 | unsigned int num_demanded; |
1430 | 1446 | ||
1431 | num_demanded = GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes); | 1447 | num_demanded = GNUNET_CONTAINER_multihashmap_size ( |
1448 | op->state->demanded_hashes); | ||
1432 | 1449 | ||
1433 | if (PHASE_FINISH_WAITING == op->state->phase) | 1450 | if (PHASE_FINISH_WAITING == op->state->phase) |
1451 | { | ||
1452 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1453 | "In PHASE_FINISH_WAITING, pending %u demands\n", | ||
1454 | num_demanded); | ||
1455 | if (0 == num_demanded) | ||
1434 | { | 1456 | { |
1435 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1457 | struct GNUNET_MQ_Envelope *ev; |
1436 | "In PHASE_FINISH_WAITING, pending %u demands\n", | 1458 | |
1437 | num_demanded); | 1459 | op->state->phase = PHASE_DONE; |
1438 | if (0 == num_demanded) | 1460 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); |
1439 | { | 1461 | GNUNET_MQ_send (op->mq, |
1440 | struct GNUNET_MQ_Envelope *ev; | 1462 | ev); |
1441 | 1463 | /* We now wait until the other peer sends P2P_OVER | |
1442 | op->state->phase = PHASE_DONE; | 1464 | * after it got all elements from us. */ |
1443 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | ||
1444 | GNUNET_MQ_send(op->mq, | ||
1445 | ev); | ||
1446 | /* We now wait until the other peer sends P2P_OVER | ||
1447 | * after it got all elements from us. */ | ||
1448 | } | ||
1449 | } | 1465 | } |
1466 | } | ||
1450 | if (PHASE_FINISH_CLOSING == op->state->phase) | 1467 | if (PHASE_FINISH_CLOSING == op->state->phase) |
1468 | { | ||
1469 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1470 | "In PHASE_FINISH_CLOSING, pending %u demands\n", | ||
1471 | num_demanded); | ||
1472 | if (0 == num_demanded) | ||
1451 | { | 1473 | { |
1452 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1474 | op->state->phase = PHASE_DONE; |
1453 | "In PHASE_FINISH_CLOSING, pending %u demands\n", | 1475 | send_client_done (op); |
1454 | num_demanded); | 1476 | _GSS_operation_destroy2 (op); |
1455 | if (0 == num_demanded) | ||
1456 | { | ||
1457 | op->state->phase = PHASE_DONE; | ||
1458 | send_client_done(op); | ||
1459 | _GSS_operation_destroy2(op); | ||
1460 | } | ||
1461 | } | 1477 | } |
1478 | } | ||
1462 | } | 1479 | } |
1463 | 1480 | ||
1464 | 1481 | ||
@@ -1469,21 +1486,21 @@ maybe_finish(struct Operation *op) | |||
1469 | * @param emsg the message | 1486 | * @param emsg the message |
1470 | */ | 1487 | */ |
1471 | int | 1488 | int |
1472 | check_union_p2p_elements(void *cls, | 1489 | check_union_p2p_elements (void *cls, |
1473 | const struct GNUNET_SET_ElementMessage *emsg) | 1490 | const struct GNUNET_SET_ElementMessage *emsg) |
1474 | { | 1491 | { |
1475 | struct Operation *op = cls; | 1492 | struct Operation *op = cls; |
1476 | 1493 | ||
1477 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1494 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1478 | { | 1495 | { |
1479 | GNUNET_break_op(0); | 1496 | GNUNET_break_op (0); |
1480 | return GNUNET_SYSERR; | 1497 | return GNUNET_SYSERR; |
1481 | } | 1498 | } |
1482 | if (0 == GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes)) | 1499 | if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) |
1483 | { | 1500 | { |
1484 | GNUNET_break_op(0); | 1501 | GNUNET_break_op (0); |
1485 | return GNUNET_SYSERR; | 1502 | return GNUNET_SYSERR; |
1486 | } | 1503 | } |
1487 | return GNUNET_OK; | 1504 | return GNUNET_OK; |
1488 | } | 1505 | } |
1489 | 1506 | ||
@@ -1497,98 +1514,99 @@ check_union_p2p_elements(void *cls, | |||
1497 | * @param emsg the message | 1514 | * @param emsg the message |
1498 | */ | 1515 | */ |
1499 | void | 1516 | void |
1500 | handle_union_p2p_elements(void *cls, | 1517 | handle_union_p2p_elements (void *cls, |
1501 | const struct GNUNET_SET_ElementMessage *emsg) | 1518 | const struct GNUNET_SET_ElementMessage *emsg) |
1502 | { | 1519 | { |
1503 | struct Operation *op = cls; | 1520 | struct Operation *op = cls; |
1504 | struct ElementEntry *ee; | 1521 | struct ElementEntry *ee; |
1505 | struct KeyEntry *ke; | 1522 | struct KeyEntry *ke; |
1506 | uint16_t element_size; | 1523 | uint16_t element_size; |
1507 | 1524 | ||
1508 | element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage); | 1525 | element_size = ntohs (emsg->header.size) - sizeof(struct |
1509 | ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size); | 1526 | GNUNET_SET_ElementMessage); |
1510 | GNUNET_memcpy(&ee[1], | 1527 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); |
1511 | &emsg[1], | 1528 | GNUNET_memcpy (&ee[1], |
1512 | element_size); | 1529 | &emsg[1], |
1530 | element_size); | ||
1513 | ee->element.size = element_size; | 1531 | ee->element.size = element_size; |
1514 | ee->element.data = &ee[1]; | 1532 | ee->element.data = &ee[1]; |
1515 | ee->element.element_type = ntohs(emsg->element_type); | 1533 | ee->element.element_type = ntohs (emsg->element_type); |
1516 | ee->remote = GNUNET_YES; | 1534 | ee->remote = GNUNET_YES; |
1517 | GNUNET_SET_element_hash(&ee->element, | 1535 | GNUNET_SET_element_hash (&ee->element, |
1518 | &ee->element_hash); | 1536 | &ee->element_hash); |
1519 | if (GNUNET_NO == | 1537 | if (GNUNET_NO == |
1520 | GNUNET_CONTAINER_multihashmap_remove(op->state->demanded_hashes, | 1538 | GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, |
1521 | &ee->element_hash, | 1539 | &ee->element_hash, |
1522 | NULL)) | 1540 | NULL)) |
1523 | { | 1541 | { |
1524 | /* We got something we didn't demand, since it's not in our map. */ | 1542 | /* We got something we didn't demand, since it's not in our map. */ |
1525 | GNUNET_break_op(0); | 1543 | GNUNET_break_op (0); |
1526 | fail_union_operation(op); | 1544 | fail_union_operation (op); |
1527 | return; | 1545 | return; |
1528 | } | 1546 | } |
1529 | 1547 | ||
1530 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1548 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1531 | "Got element (size %u, hash %s) from peer\n", | 1549 | "Got element (size %u, hash %s) from peer\n", |
1532 | (unsigned int)element_size, | 1550 | (unsigned int) element_size, |
1533 | GNUNET_h2s(&ee->element_hash)); | 1551 | GNUNET_h2s (&ee->element_hash)); |
1534 | 1552 | ||
1535 | GNUNET_STATISTICS_update(_GSS_statistics, | 1553 | GNUNET_STATISTICS_update (_GSS_statistics, |
1536 | "# received elements", | 1554 | "# received elements", |
1537 | 1, | 1555 | 1, |
1538 | GNUNET_NO); | 1556 | GNUNET_NO); |
1539 | GNUNET_STATISTICS_update(_GSS_statistics, | 1557 | GNUNET_STATISTICS_update (_GSS_statistics, |
1540 | "# exchanged elements", | 1558 | "# exchanged elements", |
1541 | 1, | 1559 | 1, |
1542 | GNUNET_NO); | 1560 | GNUNET_NO); |
1543 | 1561 | ||
1544 | op->state->received_total++; | 1562 | op->state->received_total++; |
1545 | 1563 | ||
1546 | ke = op_get_element(op, &ee->element_hash); | 1564 | ke = op_get_element (op, &ee->element_hash); |
1547 | if (NULL != ke) | 1565 | if (NULL != ke) |
1548 | { | 1566 | { |
1549 | /* Got repeated element. Should not happen since | 1567 | /* Got repeated element. Should not happen since |
1550 | * we track demands. */ | 1568 | * we track demands. */ |
1551 | GNUNET_STATISTICS_update(_GSS_statistics, | 1569 | GNUNET_STATISTICS_update (_GSS_statistics, |
1552 | "# repeated elements", | 1570 | "# repeated elements", |
1553 | 1, | 1571 | 1, |
1554 | GNUNET_NO); | 1572 | GNUNET_NO); |
1555 | ke->received = GNUNET_YES; | 1573 | ke->received = GNUNET_YES; |
1556 | GNUNET_free(ee); | 1574 | GNUNET_free (ee); |
1557 | } | 1575 | } |
1558 | else | 1576 | else |
1577 | { | ||
1578 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1579 | "Registering new element from remote peer\n"); | ||
1580 | op->state->received_fresh++; | ||
1581 | op_register_element (op, ee, GNUNET_YES); | ||
1582 | /* only send results immediately if the client wants it */ | ||
1583 | switch (op->result_mode) | ||
1559 | { | 1584 | { |
1560 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1585 | case GNUNET_SET_RESULT_ADDED: |
1561 | "Registering new element from remote peer\n"); | 1586 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); |
1562 | op->state->received_fresh++; | 1587 | break; |
1563 | op_register_element(op, ee, GNUNET_YES); | 1588 | |
1564 | /* only send results immediately if the client wants it */ | 1589 | case GNUNET_SET_RESULT_SYMMETRIC: |
1565 | switch (op->result_mode) | 1590 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); |
1566 | { | 1591 | break; |
1567 | case GNUNET_SET_RESULT_ADDED: | 1592 | |
1568 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK); | 1593 | default: |
1569 | break; | 1594 | /* Result mode not supported, should have been caught earlier. */ |
1570 | 1595 | GNUNET_break (0); | |
1571 | case GNUNET_SET_RESULT_SYMMETRIC: | 1596 | break; |
1572 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); | ||
1573 | break; | ||
1574 | |||
1575 | default: | ||
1576 | /* Result mode not supported, should have been caught earlier. */ | ||
1577 | GNUNET_break(0); | ||
1578 | break; | ||
1579 | } | ||
1580 | } | 1597 | } |
1598 | } | ||
1581 | 1599 | ||
1582 | if ((op->state->received_total > 8) && | 1600 | if ((op->state->received_total > 8) && |
1583 | (op->state->received_fresh < op->state->received_total / 3)) | 1601 | (op->state->received_fresh < op->state->received_total / 3)) |
1584 | { | 1602 | { |
1585 | /* The other peer gave us lots of old elements, there's something wrong. */ | 1603 | /* The other peer gave us lots of old elements, there's something wrong. */ |
1586 | GNUNET_break_op(0); | 1604 | GNUNET_break_op (0); |
1587 | fail_union_operation(op); | 1605 | fail_union_operation (op); |
1588 | return; | 1606 | return; |
1589 | } | 1607 | } |
1590 | GNUNET_CADET_receive_done(op->channel); | 1608 | GNUNET_CADET_receive_done (op->channel); |
1591 | maybe_finish(op); | 1609 | maybe_finish (op); |
1592 | } | 1610 | } |
1593 | 1611 | ||
1594 | 1612 | ||
@@ -1599,16 +1617,16 @@ handle_union_p2p_elements(void *cls, | |||
1599 | * @param emsg the message | 1617 | * @param emsg the message |
1600 | */ | 1618 | */ |
1601 | int | 1619 | int |
1602 | check_union_p2p_full_element(void *cls, | 1620 | check_union_p2p_full_element (void *cls, |
1603 | const struct GNUNET_SET_ElementMessage *emsg) | 1621 | const struct GNUNET_SET_ElementMessage *emsg) |
1604 | { | 1622 | { |
1605 | struct Operation *op = cls; | 1623 | struct Operation *op = cls; |
1606 | 1624 | ||
1607 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1625 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1608 | { | 1626 | { |
1609 | GNUNET_break_op(0); | 1627 | GNUNET_break_op (0); |
1610 | return GNUNET_SYSERR; | 1628 | return GNUNET_SYSERR; |
1611 | } | 1629 | } |
1612 | // FIXME: check that we expect full elements here? | 1630 | // FIXME: check that we expect full elements here? |
1613 | return GNUNET_OK; | 1631 | return GNUNET_OK; |
1614 | } | 1632 | } |
@@ -1621,89 +1639,90 @@ check_union_p2p_full_element(void *cls, | |||
1621 | * @param emsg the message | 1639 | * @param emsg the message |
1622 | */ | 1640 | */ |
1623 | void | 1641 | void |
1624 | handle_union_p2p_full_element(void *cls, | 1642 | handle_union_p2p_full_element (void *cls, |
1625 | const struct GNUNET_SET_ElementMessage *emsg) | 1643 | const struct GNUNET_SET_ElementMessage *emsg) |
1626 | { | 1644 | { |
1627 | struct Operation *op = cls; | 1645 | struct Operation *op = cls; |
1628 | struct ElementEntry *ee; | 1646 | struct ElementEntry *ee; |
1629 | struct KeyEntry *ke; | 1647 | struct KeyEntry *ke; |
1630 | uint16_t element_size; | 1648 | uint16_t element_size; |
1631 | 1649 | ||
1632 | element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage); | 1650 | element_size = ntohs (emsg->header.size) - sizeof(struct |
1633 | ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size); | 1651 | GNUNET_SET_ElementMessage); |
1634 | GNUNET_memcpy(&ee[1], &emsg[1], element_size); | 1652 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); |
1653 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | ||
1635 | ee->element.size = element_size; | 1654 | ee->element.size = element_size; |
1636 | ee->element.data = &ee[1]; | 1655 | ee->element.data = &ee[1]; |
1637 | ee->element.element_type = ntohs(emsg->element_type); | 1656 | ee->element.element_type = ntohs (emsg->element_type); |
1638 | ee->remote = GNUNET_YES; | 1657 | ee->remote = GNUNET_YES; |
1639 | GNUNET_SET_element_hash(&ee->element, &ee->element_hash); | 1658 | GNUNET_SET_element_hash (&ee->element, &ee->element_hash); |
1640 | 1659 | ||
1641 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1660 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1642 | "Got element (full diff, size %u, hash %s) from peer\n", | 1661 | "Got element (full diff, size %u, hash %s) from peer\n", |
1643 | (unsigned int)element_size, | 1662 | (unsigned int) element_size, |
1644 | GNUNET_h2s(&ee->element_hash)); | 1663 | GNUNET_h2s (&ee->element_hash)); |
1645 | 1664 | ||
1646 | GNUNET_STATISTICS_update(_GSS_statistics, | 1665 | GNUNET_STATISTICS_update (_GSS_statistics, |
1647 | "# received elements", | 1666 | "# received elements", |
1648 | 1, | 1667 | 1, |
1649 | GNUNET_NO); | 1668 | GNUNET_NO); |
1650 | GNUNET_STATISTICS_update(_GSS_statistics, | 1669 | GNUNET_STATISTICS_update (_GSS_statistics, |
1651 | "# exchanged elements", | 1670 | "# exchanged elements", |
1652 | 1, | 1671 | 1, |
1653 | GNUNET_NO); | 1672 | GNUNET_NO); |
1654 | 1673 | ||
1655 | op->state->received_total++; | 1674 | op->state->received_total++; |
1656 | 1675 | ||
1657 | ke = op_get_element(op, &ee->element_hash); | 1676 | ke = op_get_element (op, &ee->element_hash); |
1658 | if (NULL != ke) | 1677 | if (NULL != ke) |
1659 | { | 1678 | { |
1660 | /* Got repeated element. Should not happen since | 1679 | /* Got repeated element. Should not happen since |
1661 | * we track demands. */ | 1680 | * we track demands. */ |
1662 | GNUNET_STATISTICS_update(_GSS_statistics, | 1681 | GNUNET_STATISTICS_update (_GSS_statistics, |
1663 | "# repeated elements", | 1682 | "# repeated elements", |
1664 | 1, | 1683 | 1, |
1665 | GNUNET_NO); | 1684 | GNUNET_NO); |
1666 | ke->received = GNUNET_YES; | 1685 | ke->received = GNUNET_YES; |
1667 | GNUNET_free(ee); | 1686 | GNUNET_free (ee); |
1668 | } | 1687 | } |
1669 | else | 1688 | else |
1689 | { | ||
1690 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1691 | "Registering new element from remote peer\n"); | ||
1692 | op->state->received_fresh++; | ||
1693 | op_register_element (op, ee, GNUNET_YES); | ||
1694 | /* only send results immediately if the client wants it */ | ||
1695 | switch (op->result_mode) | ||
1670 | { | 1696 | { |
1671 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1697 | case GNUNET_SET_RESULT_ADDED: |
1672 | "Registering new element from remote peer\n"); | 1698 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); |
1673 | op->state->received_fresh++; | 1699 | break; |
1674 | op_register_element(op, ee, GNUNET_YES); | 1700 | |
1675 | /* only send results immediately if the client wants it */ | 1701 | case GNUNET_SET_RESULT_SYMMETRIC: |
1676 | switch (op->result_mode) | 1702 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); |
1677 | { | 1703 | break; |
1678 | case GNUNET_SET_RESULT_ADDED: | 1704 | |
1679 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK); | 1705 | default: |
1680 | break; | 1706 | /* Result mode not supported, should have been caught earlier. */ |
1681 | 1707 | GNUNET_break (0); | |
1682 | case GNUNET_SET_RESULT_SYMMETRIC: | 1708 | break; |
1683 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); | ||
1684 | break; | ||
1685 | |||
1686 | default: | ||
1687 | /* Result mode not supported, should have been caught earlier. */ | ||
1688 | GNUNET_break(0); | ||
1689 | break; | ||
1690 | } | ||
1691 | } | 1709 | } |
1710 | } | ||
1692 | 1711 | ||
1693 | if ((GNUNET_YES == op->byzantine) && | 1712 | if ((GNUNET_YES == op->byzantine) && |
1694 | (op->state->received_total > 384 + op->state->received_fresh * 4) && | 1713 | (op->state->received_total > 384 + op->state->received_fresh * 4) && |
1695 | (op->state->received_fresh < op->state->received_total / 6)) | 1714 | (op->state->received_fresh < op->state->received_total / 6)) |
1696 | { | 1715 | { |
1697 | /* The other peer gave us lots of old elements, there's something wrong. */ | 1716 | /* The other peer gave us lots of old elements, there's something wrong. */ |
1698 | LOG(GNUNET_ERROR_TYPE_ERROR, | 1717 | LOG (GNUNET_ERROR_TYPE_ERROR, |
1699 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", | 1718 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", |
1700 | (unsigned long long)op->state->received_fresh, | 1719 | (unsigned long long) op->state->received_fresh, |
1701 | (unsigned long long)op->state->received_total); | 1720 | (unsigned long long) op->state->received_total); |
1702 | GNUNET_break_op(0); | 1721 | GNUNET_break_op (0); |
1703 | fail_union_operation(op); | 1722 | fail_union_operation (op); |
1704 | return; | 1723 | return; |
1705 | } | 1724 | } |
1706 | GNUNET_CADET_receive_done(op->channel); | 1725 | GNUNET_CADET_receive_done (op->channel); |
1707 | } | 1726 | } |
1708 | 1727 | ||
1709 | 1728 | ||
@@ -1715,30 +1734,30 @@ handle_union_p2p_full_element(void *cls, | |||
1715 | * @param msg the message | 1734 | * @param msg the message |
1716 | */ | 1735 | */ |
1717 | int | 1736 | int |
1718 | check_union_p2p_inquiry(void *cls, | 1737 | check_union_p2p_inquiry (void *cls, |
1719 | const struct InquiryMessage *msg) | 1738 | const struct InquiryMessage *msg) |
1720 | { | 1739 | { |
1721 | struct Operation *op = cls; | 1740 | struct Operation *op = cls; |
1722 | unsigned int num_keys; | 1741 | unsigned int num_keys; |
1723 | 1742 | ||
1724 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1743 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1725 | { | 1744 | { |
1726 | GNUNET_break_op(0); | 1745 | GNUNET_break_op (0); |
1727 | return GNUNET_SYSERR; | 1746 | return GNUNET_SYSERR; |
1728 | } | 1747 | } |
1729 | if (op->state->phase != PHASE_INVENTORY_PASSIVE) | 1748 | if (op->state->phase != PHASE_INVENTORY_PASSIVE) |
1730 | { | 1749 | { |
1731 | GNUNET_break_op(0); | 1750 | GNUNET_break_op (0); |
1732 | return GNUNET_SYSERR; | 1751 | return GNUNET_SYSERR; |
1733 | } | 1752 | } |
1734 | num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage)) | 1753 | num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) |
1735 | / sizeof(struct IBF_Key); | 1754 | / sizeof(struct IBF_Key); |
1736 | if ((ntohs(msg->header.size) - sizeof(struct InquiryMessage)) | 1755 | if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage)) |
1737 | != num_keys * sizeof(struct IBF_Key)) | 1756 | != num_keys * sizeof(struct IBF_Key)) |
1738 | { | 1757 | { |
1739 | GNUNET_break_op(0); | 1758 | GNUNET_break_op (0); |
1740 | return GNUNET_SYSERR; | 1759 | return GNUNET_SYSERR; |
1741 | } | 1760 | } |
1742 | return GNUNET_OK; | 1761 | return GNUNET_OK; |
1743 | } | 1762 | } |
1744 | 1763 | ||
@@ -1751,30 +1770,30 @@ check_union_p2p_inquiry(void *cls, | |||
1751 | * @param msg the message | 1770 | * @param msg the message |
1752 | */ | 1771 | */ |
1753 | void | 1772 | void |
1754 | handle_union_p2p_inquiry(void *cls, | 1773 | handle_union_p2p_inquiry (void *cls, |
1755 | const struct InquiryMessage *msg) | 1774 | const struct InquiryMessage *msg) |
1756 | { | 1775 | { |
1757 | struct Operation *op = cls; | 1776 | struct Operation *op = cls; |
1758 | const struct IBF_Key *ibf_key; | 1777 | const struct IBF_Key *ibf_key; |
1759 | unsigned int num_keys; | 1778 | unsigned int num_keys; |
1760 | 1779 | ||
1761 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1780 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1762 | "Received union inquiry\n"); | 1781 | "Received union inquiry\n"); |
1763 | num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage)) | 1782 | num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) |
1764 | / sizeof(struct IBF_Key); | 1783 | / sizeof(struct IBF_Key); |
1765 | ibf_key = (const struct IBF_Key *)&msg[1]; | 1784 | ibf_key = (const struct IBF_Key *) &msg[1]; |
1766 | while (0 != num_keys--) | 1785 | while (0 != num_keys--) |
1767 | { | 1786 | { |
1768 | struct IBF_Key unsalted_key; | 1787 | struct IBF_Key unsalted_key; |
1769 | 1788 | ||
1770 | unsalt_key(ibf_key, | 1789 | unsalt_key (ibf_key, |
1771 | ntohl(msg->salt), | 1790 | ntohl (msg->salt), |
1772 | &unsalted_key); | 1791 | &unsalted_key); |
1773 | send_offers_for_key(op, | 1792 | send_offers_for_key (op, |
1774 | unsalted_key); | 1793 | unsalted_key); |
1775 | ibf_key++; | 1794 | ibf_key++; |
1776 | } | 1795 | } |
1777 | GNUNET_CADET_receive_done(op->channel); | 1796 | GNUNET_CADET_receive_done (op->channel); |
1778 | } | 1797 | } |
1779 | 1798 | ||
1780 | 1799 | ||
@@ -1789,9 +1808,9 @@ handle_union_p2p_inquiry(void *cls, | |||
1789 | * #GNUNET_NO if not. | 1808 | * #GNUNET_NO if not. |
1790 | */ | 1809 | */ |
1791 | static int | 1810 | static int |
1792 | send_missing_full_elements_iter(void *cls, | 1811 | send_missing_full_elements_iter (void *cls, |
1793 | uint32_t key, | 1812 | uint32_t key, |
1794 | void *value) | 1813 | void *value) |
1795 | { | 1814 | { |
1796 | struct Operation *op = cls; | 1815 | struct Operation *op = cls; |
1797 | struct KeyEntry *ke = value; | 1816 | struct KeyEntry *ke = value; |
@@ -1801,15 +1820,15 @@ send_missing_full_elements_iter(void *cls, | |||
1801 | 1820 | ||
1802 | if (GNUNET_YES == ke->received) | 1821 | if (GNUNET_YES == ke->received) |
1803 | return GNUNET_YES; | 1822 | return GNUNET_YES; |
1804 | ev = GNUNET_MQ_msg_extra(emsg, | 1823 | ev = GNUNET_MQ_msg_extra (emsg, |
1805 | ee->element.size, | 1824 | ee->element.size, |
1806 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | 1825 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); |
1807 | GNUNET_memcpy(&emsg[1], | 1826 | GNUNET_memcpy (&emsg[1], |
1808 | ee->element.data, | 1827 | ee->element.data, |
1809 | ee->element.size); | 1828 | ee->element.size); |
1810 | emsg->element_type = htons(ee->element.element_type); | 1829 | emsg->element_type = htons (ee->element.element_type); |
1811 | GNUNET_MQ_send(op->mq, | 1830 | GNUNET_MQ_send (op->mq, |
1812 | ev); | 1831 | ev); |
1813 | return GNUNET_YES; | 1832 | return GNUNET_YES; |
1814 | } | 1833 | } |
1815 | 1834 | ||
@@ -1821,30 +1840,30 @@ send_missing_full_elements_iter(void *cls, | |||
1821 | * @param mh the demand message | 1840 | * @param mh the demand message |
1822 | */ | 1841 | */ |
1823 | void | 1842 | void |
1824 | handle_union_p2p_request_full(void *cls, | 1843 | handle_union_p2p_request_full (void *cls, |
1825 | const struct GNUNET_MessageHeader *mh) | 1844 | const struct GNUNET_MessageHeader *mh) |
1826 | { | 1845 | { |
1827 | struct Operation *op = cls; | 1846 | struct Operation *op = cls; |
1828 | 1847 | ||
1829 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1848 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1830 | "Received request for full set transmission\n"); | 1849 | "Received request for full set transmission\n"); |
1831 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1850 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1832 | { | 1851 | { |
1833 | GNUNET_break_op(0); | 1852 | GNUNET_break_op (0); |
1834 | fail_union_operation(op); | 1853 | fail_union_operation (op); |
1835 | return; | 1854 | return; |
1836 | } | 1855 | } |
1837 | if (PHASE_EXPECT_IBF != op->state->phase) | 1856 | if (PHASE_EXPECT_IBF != op->state->phase) |
1838 | { | 1857 | { |
1839 | GNUNET_break_op(0); | 1858 | GNUNET_break_op (0); |
1840 | fail_union_operation(op); | 1859 | fail_union_operation (op); |
1841 | return; | 1860 | return; |
1842 | } | 1861 | } |
1843 | 1862 | ||
1844 | // FIXME: we need to check that our set is larger than the | 1863 | // FIXME: we need to check that our set is larger than the |
1845 | // byzantine_lower_bound by some threshold | 1864 | // byzantine_lower_bound by some threshold |
1846 | send_full_set(op); | 1865 | send_full_set (op); |
1847 | GNUNET_CADET_receive_done(op->channel); | 1866 | GNUNET_CADET_receive_done (op->channel); |
1848 | } | 1867 | } |
1849 | 1868 | ||
1850 | 1869 | ||
@@ -1855,55 +1874,55 @@ handle_union_p2p_request_full(void *cls, | |||
1855 | * @param mh the demand message | 1874 | * @param mh the demand message |
1856 | */ | 1875 | */ |
1857 | void | 1876 | void |
1858 | handle_union_p2p_full_done(void *cls, | 1877 | handle_union_p2p_full_done (void *cls, |
1859 | const struct GNUNET_MessageHeader *mh) | 1878 | const struct GNUNET_MessageHeader *mh) |
1860 | { | 1879 | { |
1861 | struct Operation *op = cls; | 1880 | struct Operation *op = cls; |
1862 | 1881 | ||
1863 | switch (op->state->phase) | 1882 | switch (op->state->phase) |
1864 | { | 1883 | { |
1865 | case PHASE_EXPECT_IBF: | 1884 | case PHASE_EXPECT_IBF: |
1866 | { | 1885 | { |
1867 | struct GNUNET_MQ_Envelope *ev; | 1886 | struct GNUNET_MQ_Envelope *ev; |
1868 | 1887 | ||
1869 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1888 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1870 | "got FULL DONE, sending elements that other peer is missing\n"); | 1889 | "got FULL DONE, sending elements that other peer is missing\n"); |
1871 | 1890 | ||
1872 | /* send all the elements that did not come from the remote peer */ | 1891 | /* send all the elements that did not come from the remote peer */ |
1873 | GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, | 1892 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, |
1874 | &send_missing_full_elements_iter, | 1893 | &send_missing_full_elements_iter, |
1875 | op); | 1894 | op); |
1876 | 1895 | ||
1877 | ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | 1896 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); |
1878 | GNUNET_MQ_send(op->mq, | 1897 | GNUNET_MQ_send (op->mq, |
1879 | ev); | 1898 | ev); |
1880 | op->state->phase = PHASE_DONE; | 1899 | op->state->phase = PHASE_DONE; |
1881 | /* we now wait until the other peer sends us the OVER message*/ | 1900 | /* we now wait until the other peer sends us the OVER message*/ |
1882 | } | 1901 | } |
1883 | break; | 1902 | break; |
1884 | 1903 | ||
1885 | case PHASE_FULL_SENDING: | 1904 | case PHASE_FULL_SENDING: |
1886 | { | 1905 | { |
1887 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1906 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1888 | "got FULL DONE, finishing\n"); | 1907 | "got FULL DONE, finishing\n"); |
1889 | /* We sent the full set, and got the response for that. We're done. */ | 1908 | /* We sent the full set, and got the response for that. We're done. */ |
1890 | op->state->phase = PHASE_DONE; | 1909 | op->state->phase = PHASE_DONE; |
1891 | GNUNET_CADET_receive_done(op->channel); | 1910 | GNUNET_CADET_receive_done (op->channel); |
1892 | send_client_done(op); | 1911 | send_client_done (op); |
1893 | _GSS_operation_destroy2(op); | 1912 | _GSS_operation_destroy2 (op); |
1894 | return; | 1913 | return; |
1895 | } | 1914 | } |
1896 | break; | 1915 | break; |
1897 | 1916 | ||
1898 | default: | 1917 | default: |
1899 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 1918 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1900 | "Handle full done phase is %u\n", | 1919 | "Handle full done phase is %u\n", |
1901 | (unsigned)op->state->phase); | 1920 | (unsigned) op->state->phase); |
1902 | GNUNET_break_op(0); | 1921 | GNUNET_break_op (0); |
1903 | fail_union_operation(op); | 1922 | fail_union_operation (op); |
1904 | return; | 1923 | return; |
1905 | } | 1924 | } |
1906 | GNUNET_CADET_receive_done(op->channel); | 1925 | GNUNET_CADET_receive_done (op->channel); |
1907 | } | 1926 | } |
1908 | 1927 | ||
1909 | 1928 | ||
@@ -1916,25 +1935,25 @@ handle_union_p2p_full_done(void *cls, | |||
1916 | * @return #GNUNET_OK if @a mh is well-formed | 1935 | * @return #GNUNET_OK if @a mh is well-formed |
1917 | */ | 1936 | */ |
1918 | int | 1937 | int |
1919 | check_union_p2p_demand(void *cls, | 1938 | check_union_p2p_demand (void *cls, |
1920 | const struct GNUNET_MessageHeader *mh) | 1939 | const struct GNUNET_MessageHeader *mh) |
1921 | { | 1940 | { |
1922 | struct Operation *op = cls; | 1941 | struct Operation *op = cls; |
1923 | unsigned int num_hashes; | 1942 | unsigned int num_hashes; |
1924 | 1943 | ||
1925 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 1944 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
1926 | { | 1945 | { |
1927 | GNUNET_break_op(0); | 1946 | GNUNET_break_op (0); |
1928 | return GNUNET_SYSERR; | 1947 | return GNUNET_SYSERR; |
1929 | } | 1948 | } |
1930 | num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) | 1949 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
1931 | / sizeof(struct GNUNET_HashCode); | 1950 | / sizeof(struct GNUNET_HashCode); |
1932 | if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) | 1951 | if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
1933 | != num_hashes * sizeof(struct GNUNET_HashCode)) | 1952 | != num_hashes * sizeof(struct GNUNET_HashCode)) |
1934 | { | 1953 | { |
1935 | GNUNET_break_op(0); | 1954 | GNUNET_break_op (0); |
1936 | return GNUNET_SYSERR; | 1955 | return GNUNET_SYSERR; |
1937 | } | 1956 | } |
1938 | return GNUNET_OK; | 1957 | return GNUNET_OK; |
1939 | } | 1958 | } |
1940 | 1959 | ||
@@ -1947,8 +1966,8 @@ check_union_p2p_demand(void *cls, | |||
1947 | * @param mh the demand message | 1966 | * @param mh the demand message |
1948 | */ | 1967 | */ |
1949 | void | 1968 | void |
1950 | handle_union_p2p_demand(void *cls, | 1969 | handle_union_p2p_demand (void *cls, |
1951 | const struct GNUNET_MessageHeader *mh) | 1970 | const struct GNUNET_MessageHeader *mh) |
1952 | { | 1971 | { |
1953 | struct Operation *op = cls; | 1972 | struct Operation *op = cls; |
1954 | struct ElementEntry *ee; | 1973 | struct ElementEntry *ee; |
@@ -1957,60 +1976,61 @@ handle_union_p2p_demand(void *cls, | |||
1957 | unsigned int num_hashes; | 1976 | unsigned int num_hashes; |
1958 | struct GNUNET_MQ_Envelope *ev; | 1977 | struct GNUNET_MQ_Envelope *ev; |
1959 | 1978 | ||
1960 | num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) | 1979 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
1961 | / sizeof(struct GNUNET_HashCode); | 1980 | / sizeof(struct GNUNET_HashCode); |
1962 | for (hash = (const struct GNUNET_HashCode *)&mh[1]; | 1981 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; |
1963 | num_hashes > 0; | 1982 | num_hashes > 0; |
1964 | hash++, num_hashes--) | 1983 | hash++, num_hashes--) |
1984 | { | ||
1985 | ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, | ||
1986 | hash); | ||
1987 | if (NULL == ee) | ||
1965 | { | 1988 | { |
1966 | ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements, | 1989 | /* Demand for non-existing element. */ |
1967 | hash); | 1990 | GNUNET_break_op (0); |
1968 | if (NULL == ee) | 1991 | fail_union_operation (op); |
1969 | { | 1992 | return; |
1970 | /* Demand for non-existing element. */ | 1993 | } |
1971 | GNUNET_break_op(0); | 1994 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) |
1972 | fail_union_operation(op); | 1995 | { |
1973 | return; | 1996 | /* Probably confused lazily copied sets. */ |
1974 | } | 1997 | GNUNET_break_op (0); |
1975 | if (GNUNET_NO == _GSS_is_element_of_operation(ee, op)) | 1998 | fail_union_operation (op); |
1976 | { | 1999 | return; |
1977 | /* Probably confused lazily copied sets. */ | ||
1978 | GNUNET_break_op(0); | ||
1979 | fail_union_operation(op); | ||
1980 | return; | ||
1981 | } | ||
1982 | ev = GNUNET_MQ_msg_extra(emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | ||
1983 | GNUNET_memcpy(&emsg[1], ee->element.data, ee->element.size); | ||
1984 | emsg->reserved = htons(0); | ||
1985 | emsg->element_type = htons(ee->element.element_type); | ||
1986 | LOG(GNUNET_ERROR_TYPE_DEBUG, | ||
1987 | "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", | ||
1988 | (void *)op, | ||
1989 | (unsigned int)ee->element.size, | ||
1990 | GNUNET_h2s(&ee->element_hash)); | ||
1991 | GNUNET_MQ_send(op->mq, ev); | ||
1992 | GNUNET_STATISTICS_update(_GSS_statistics, | ||
1993 | "# exchanged elements", | ||
1994 | 1, | ||
1995 | GNUNET_NO); | ||
1996 | |||
1997 | switch (op->result_mode) | ||
1998 | { | ||
1999 | case GNUNET_SET_RESULT_ADDED: | ||
2000 | /* Nothing to do. */ | ||
2001 | break; | ||
2002 | |||
2003 | case GNUNET_SET_RESULT_SYMMETRIC: | ||
2004 | send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); | ||
2005 | break; | ||
2006 | |||
2007 | default: | ||
2008 | /* Result mode not supported, should have been caught earlier. */ | ||
2009 | GNUNET_break(0); | ||
2010 | break; | ||
2011 | } | ||
2012 | } | 2000 | } |
2013 | GNUNET_CADET_receive_done(op->channel); | 2001 | ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, |
2002 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | ||
2003 | GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); | ||
2004 | emsg->reserved = htons (0); | ||
2005 | emsg->element_type = htons (ee->element.element_type); | ||
2006 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2007 | "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", | ||
2008 | (void *) op, | ||
2009 | (unsigned int) ee->element.size, | ||
2010 | GNUNET_h2s (&ee->element_hash)); | ||
2011 | GNUNET_MQ_send (op->mq, ev); | ||
2012 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
2013 | "# exchanged elements", | ||
2014 | 1, | ||
2015 | GNUNET_NO); | ||
2016 | |||
2017 | switch (op->result_mode) | ||
2018 | { | ||
2019 | case GNUNET_SET_RESULT_ADDED: | ||
2020 | /* Nothing to do. */ | ||
2021 | break; | ||
2022 | |||
2023 | case GNUNET_SET_RESULT_SYMMETRIC: | ||
2024 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); | ||
2025 | break; | ||
2026 | |||
2027 | default: | ||
2028 | /* Result mode not supported, should have been caught earlier. */ | ||
2029 | GNUNET_break (0); | ||
2030 | break; | ||
2031 | } | ||
2032 | } | ||
2033 | GNUNET_CADET_receive_done (op->channel); | ||
2014 | } | 2034 | } |
2015 | 2035 | ||
2016 | 2036 | ||
@@ -2022,32 +2042,32 @@ handle_union_p2p_demand(void *cls, | |||
2022 | * @return #GNUNET_OK if @a mh is well-formed | 2042 | * @return #GNUNET_OK if @a mh is well-formed |
2023 | */ | 2043 | */ |
2024 | int | 2044 | int |
2025 | check_union_p2p_offer(void *cls, | 2045 | check_union_p2p_offer (void *cls, |
2026 | const struct GNUNET_MessageHeader *mh) | 2046 | const struct GNUNET_MessageHeader *mh) |
2027 | { | 2047 | { |
2028 | struct Operation *op = cls; | 2048 | struct Operation *op = cls; |
2029 | unsigned int num_hashes; | 2049 | unsigned int num_hashes; |
2030 | 2050 | ||
2031 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 2051 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
2032 | { | 2052 | { |
2033 | GNUNET_break_op(0); | 2053 | GNUNET_break_op (0); |
2034 | return GNUNET_SYSERR; | 2054 | return GNUNET_SYSERR; |
2035 | } | 2055 | } |
2036 | /* look up elements and send them */ | 2056 | /* look up elements and send them */ |
2037 | if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && | 2057 | if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && |
2038 | (op->state->phase != PHASE_INVENTORY_ACTIVE)) | 2058 | (op->state->phase != PHASE_INVENTORY_ACTIVE)) |
2039 | { | 2059 | { |
2040 | GNUNET_break_op(0); | 2060 | GNUNET_break_op (0); |
2041 | return GNUNET_SYSERR; | 2061 | return GNUNET_SYSERR; |
2042 | } | 2062 | } |
2043 | num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) | 2063 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
2044 | / sizeof(struct GNUNET_HashCode); | 2064 | / sizeof(struct GNUNET_HashCode); |
2045 | if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) != | 2065 | if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) != |
2046 | num_hashes * sizeof(struct GNUNET_HashCode)) | 2066 | num_hashes * sizeof(struct GNUNET_HashCode)) |
2047 | { | 2067 | { |
2048 | GNUNET_break_op(0); | 2068 | GNUNET_break_op (0); |
2049 | return GNUNET_SYSERR; | 2069 | return GNUNET_SYSERR; |
2050 | } | 2070 | } |
2051 | return GNUNET_OK; | 2071 | return GNUNET_OK; |
2052 | } | 2072 | } |
2053 | 2073 | ||
@@ -2060,56 +2080,57 @@ check_union_p2p_offer(void *cls, | |||
2060 | * @param mh the message | 2080 | * @param mh the message |
2061 | */ | 2081 | */ |
2062 | void | 2082 | void |
2063 | handle_union_p2p_offer(void *cls, | 2083 | handle_union_p2p_offer (void *cls, |
2064 | const struct GNUNET_MessageHeader *mh) | 2084 | const struct GNUNET_MessageHeader *mh) |
2065 | { | 2085 | { |
2066 | struct Operation *op = cls; | 2086 | struct Operation *op = cls; |
2067 | const struct GNUNET_HashCode *hash; | 2087 | const struct GNUNET_HashCode *hash; |
2068 | unsigned int num_hashes; | 2088 | unsigned int num_hashes; |
2069 | 2089 | ||
2070 | num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) | 2090 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
2071 | / sizeof(struct GNUNET_HashCode); | 2091 | / sizeof(struct GNUNET_HashCode); |
2072 | for (hash = (const struct GNUNET_HashCode *)&mh[1]; | 2092 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; |
2073 | num_hashes > 0; | 2093 | num_hashes > 0; |
2074 | hash++, num_hashes--) | 2094 | hash++, num_hashes--) |
2075 | { | 2095 | { |
2076 | struct ElementEntry *ee; | 2096 | struct ElementEntry *ee; |
2077 | struct GNUNET_MessageHeader *demands; | 2097 | struct GNUNET_MessageHeader *demands; |
2078 | struct GNUNET_MQ_Envelope *ev; | 2098 | struct GNUNET_MQ_Envelope *ev; |
2079 | 2099 | ||
2080 | ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements, | 2100 | ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, |
2081 | hash); | 2101 | hash); |
2082 | if (NULL != ee) | 2102 | if (NULL != ee) |
2083 | if (GNUNET_YES == _GSS_is_element_of_operation(ee, op)) | 2103 | if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) |
2084 | continue; | 2104 | continue; |
2085 | 2105 | ||
2086 | if (GNUNET_YES == | 2106 | if (GNUNET_YES == |
2087 | GNUNET_CONTAINER_multihashmap_contains(op->state->demanded_hashes, | 2107 | GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, |
2088 | hash)) | 2108 | hash)) |
2089 | { | 2109 | { |
2090 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2110 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2091 | "Skipped sending duplicate demand\n"); | 2111 | "Skipped sending duplicate demand\n"); |
2092 | continue; | 2112 | continue; |
2093 | } | 2113 | } |
2094 | 2114 | ||
2095 | GNUNET_assert(GNUNET_OK == | 2115 | GNUNET_assert (GNUNET_OK == |
2096 | GNUNET_CONTAINER_multihashmap_put(op->state->demanded_hashes, | 2116 | GNUNET_CONTAINER_multihashmap_put ( |
2097 | hash, | 2117 | op->state->demanded_hashes, |
2098 | NULL, | 2118 | hash, |
2099 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); | 2119 | NULL, |
2100 | 2120 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); | |
2101 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2121 | |
2102 | "[OP %x] Requesting element (hash %s)\n", | 2122 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2103 | (void *)op, GNUNET_h2s(hash)); | 2123 | "[OP %x] Requesting element (hash %s)\n", |
2104 | ev = GNUNET_MQ_msg_header_extra(demands, | 2124 | (void *) op, GNUNET_h2s (hash)); |
2105 | sizeof(struct GNUNET_HashCode), | 2125 | ev = GNUNET_MQ_msg_header_extra (demands, |
2106 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); | 2126 | sizeof(struct GNUNET_HashCode), |
2107 | GNUNET_memcpy(&demands[1], | 2127 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); |
2108 | hash, | 2128 | GNUNET_memcpy (&demands[1], |
2109 | sizeof(struct GNUNET_HashCode)); | 2129 | hash, |
2110 | GNUNET_MQ_send(op->mq, ev); | 2130 | sizeof(struct GNUNET_HashCode)); |
2111 | } | 2131 | GNUNET_MQ_send (op->mq, ev); |
2112 | GNUNET_CADET_receive_done(op->channel); | 2132 | } |
2133 | GNUNET_CADET_receive_done (op->channel); | ||
2113 | } | 2134 | } |
2114 | 2135 | ||
2115 | 2136 | ||
@@ -2120,58 +2141,58 @@ handle_union_p2p_offer(void *cls, | |||
2120 | * @param mh the message | 2141 | * @param mh the message |
2121 | */ | 2142 | */ |
2122 | void | 2143 | void |
2123 | handle_union_p2p_done(void *cls, | 2144 | handle_union_p2p_done (void *cls, |
2124 | const struct GNUNET_MessageHeader *mh) | 2145 | const struct GNUNET_MessageHeader *mh) |
2125 | { | 2146 | { |
2126 | struct Operation *op = cls; | 2147 | struct Operation *op = cls; |
2127 | 2148 | ||
2128 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) | 2149 | if (GNUNET_SET_OPERATION_UNION != op->set->operation) |
2129 | { | 2150 | { |
2130 | GNUNET_break_op(0); | 2151 | GNUNET_break_op (0); |
2131 | fail_union_operation(op); | 2152 | fail_union_operation (op); |
2132 | return; | 2153 | return; |
2133 | } | 2154 | } |
2134 | switch (op->state->phase) | 2155 | switch (op->state->phase) |
2135 | { | 2156 | { |
2136 | case PHASE_INVENTORY_PASSIVE: | 2157 | case PHASE_INVENTORY_PASSIVE: |
2137 | /* We got all requests, but still have to send our elements in response. */ | 2158 | /* We got all requests, but still have to send our elements in response. */ |
2138 | op->state->phase = PHASE_FINISH_WAITING; | 2159 | op->state->phase = PHASE_FINISH_WAITING; |
2139 | 2160 | ||
2140 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2161 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2141 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); | 2162 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); |
2142 | /* The active peer is done sending offers | 2163 | /* The active peer is done sending offers |
2143 | * and inquiries. This means that all | 2164 | * and inquiries. This means that all |
2144 | * our responses to that (demands and offers) | 2165 | * our responses to that (demands and offers) |
2145 | * must be in flight (queued or in mesh). | 2166 | * must be in flight (queued or in mesh). |
2146 | * | 2167 | * |
2147 | * We should notify the active peer once | 2168 | * We should notify the active peer once |
2148 | * all our demands are satisfied, so that the active | 2169 | * all our demands are satisfied, so that the active |
2149 | * peer can quit if we gave it everything. | 2170 | * peer can quit if we gave it everything. |
2150 | */ | 2171 | */ |
2151 | GNUNET_CADET_receive_done(op->channel); | 2172 | GNUNET_CADET_receive_done (op->channel); |
2152 | maybe_finish(op); | 2173 | maybe_finish (op); |
2153 | return; | 2174 | return; |
2154 | 2175 | ||
2155 | case PHASE_INVENTORY_ACTIVE: | 2176 | case PHASE_INVENTORY_ACTIVE: |
2156 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2177 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2157 | "got DONE (as active partner), waiting to finish\n"); | 2178 | "got DONE (as active partner), waiting to finish\n"); |
2158 | /* All demands of the other peer are satisfied, | 2179 | /* All demands of the other peer are satisfied, |
2159 | * and we processed all offers, thus we know | 2180 | * and we processed all offers, thus we know |
2160 | * exactly what our demands must be. | 2181 | * exactly what our demands must be. |
2161 | * | 2182 | * |
2162 | * We'll close the channel | 2183 | * We'll close the channel |
2163 | * to the other peer once our demands are met. | 2184 | * to the other peer once our demands are met. |
2164 | */ | 2185 | */ |
2165 | op->state->phase = PHASE_FINISH_CLOSING; | 2186 | op->state->phase = PHASE_FINISH_CLOSING; |
2166 | GNUNET_CADET_receive_done(op->channel); | 2187 | GNUNET_CADET_receive_done (op->channel); |
2167 | maybe_finish(op); | 2188 | maybe_finish (op); |
2168 | return; | 2189 | return; |
2169 | 2190 | ||
2170 | default: | 2191 | default: |
2171 | GNUNET_break_op(0); | 2192 | GNUNET_break_op (0); |
2172 | fail_union_operation(op); | 2193 | fail_union_operation (op); |
2173 | return; | 2194 | return; |
2174 | } | 2195 | } |
2175 | } | 2196 | } |
2176 | 2197 | ||
2177 | /** | 2198 | /** |
@@ -2181,10 +2202,10 @@ handle_union_p2p_done(void *cls, | |||
2181 | * @param mh the message | 2202 | * @param mh the message |
2182 | */ | 2203 | */ |
2183 | void | 2204 | void |
2184 | handle_union_p2p_over(void *cls, | 2205 | handle_union_p2p_over (void *cls, |
2185 | const struct GNUNET_MessageHeader *mh) | 2206 | const struct GNUNET_MessageHeader *mh) |
2186 | { | 2207 | { |
2187 | send_client_done(cls); | 2208 | send_client_done (cls); |
2188 | } | 2209 | } |
2189 | 2210 | ||
2190 | 2211 | ||
@@ -2196,54 +2217,55 @@ handle_union_p2p_over(void *cls, | |||
2196 | * to convince it to accept, may be NULL | 2217 | * to convince it to accept, may be NULL |
2197 | */ | 2218 | */ |
2198 | static struct OperationState * | 2219 | static struct OperationState * |
2199 | union_evaluate(struct Operation *op, | 2220 | union_evaluate (struct Operation *op, |
2200 | const struct GNUNET_MessageHeader *opaque_context) | 2221 | const struct GNUNET_MessageHeader *opaque_context) |
2201 | { | 2222 | { |
2202 | struct OperationState *state; | 2223 | struct OperationState *state; |
2203 | struct GNUNET_MQ_Envelope *ev; | 2224 | struct GNUNET_MQ_Envelope *ev; |
2204 | struct OperationRequestMessage *msg; | 2225 | struct OperationRequestMessage *msg; |
2205 | 2226 | ||
2206 | ev = GNUNET_MQ_msg_nested_mh(msg, | 2227 | ev = GNUNET_MQ_msg_nested_mh (msg, |
2207 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 2228 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
2208 | opaque_context); | 2229 | opaque_context); |
2209 | if (NULL == ev) | 2230 | if (NULL == ev) |
2210 | { | 2231 | { |
2211 | /* the context message is too large */ | 2232 | /* the context message is too large */ |
2212 | GNUNET_break(0); | 2233 | GNUNET_break (0); |
2213 | return NULL; | 2234 | return NULL; |
2214 | } | 2235 | } |
2215 | state = GNUNET_new(struct OperationState); | 2236 | state = GNUNET_new (struct OperationState); |
2216 | state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32, | 2237 | state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, |
2217 | GNUNET_NO); | 2238 | GNUNET_NO); |
2218 | /* copy the current generation's strata estimator for this operation */ | 2239 | /* copy the current generation's strata estimator for this operation */ |
2219 | state->se = strata_estimator_dup(op->set->state->se); | 2240 | state->se = strata_estimator_dup (op->set->state->se); |
2220 | /* we started the operation, thus we have to send the operation request */ | 2241 | /* we started the operation, thus we have to send the operation request */ |
2221 | state->phase = PHASE_EXPECT_SE; | 2242 | state->phase = PHASE_EXPECT_SE; |
2222 | state->salt_receive = state->salt_send = 42; // FIXME????? | 2243 | state->salt_receive = state->salt_send = 42; // FIXME????? |
2223 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2244 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2224 | "Initiating union operation evaluation\n"); | 2245 | "Initiating union operation evaluation\n"); |
2225 | GNUNET_STATISTICS_update(_GSS_statistics, | 2246 | GNUNET_STATISTICS_update (_GSS_statistics, |
2226 | "# of total union operations", | 2247 | "# of total union operations", |
2227 | 1, | 2248 | 1, |
2228 | GNUNET_NO); | 2249 | GNUNET_NO); |
2229 | GNUNET_STATISTICS_update(_GSS_statistics, | 2250 | GNUNET_STATISTICS_update (_GSS_statistics, |
2230 | "# of initiated union operations", | 2251 | "# of initiated union operations", |
2231 | 1, | 2252 | 1, |
2232 | GNUNET_NO); | 2253 | GNUNET_NO); |
2233 | msg->operation = htonl(GNUNET_SET_OPERATION_UNION); | 2254 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); |
2234 | GNUNET_MQ_send(op->mq, | 2255 | GNUNET_MQ_send (op->mq, |
2235 | ev); | 2256 | ev); |
2236 | 2257 | ||
2237 | if (NULL != opaque_context) | 2258 | if (NULL != opaque_context) |
2238 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2259 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2239 | "sent op request with context message\n"); | 2260 | "sent op request with context message\n"); |
2240 | else | 2261 | else |
2241 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2262 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2242 | "sent op request without context message\n"); | 2263 | "sent op request without context message\n"); |
2243 | 2264 | ||
2244 | op->state = state; | 2265 | op->state = state; |
2245 | initialize_key_to_element(op); | 2266 | initialize_key_to_element (op); |
2246 | state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element); | 2267 | state->initial_size = GNUNET_CONTAINER_multihashmap32_size ( |
2268 | state->key_to_element); | ||
2247 | return state; | 2269 | return state; |
2248 | } | 2270 | } |
2249 | 2271 | ||
@@ -2255,7 +2277,7 @@ union_evaluate(struct Operation *op, | |||
2255 | * @param op operation that will be accepted as a union operation | 2277 | * @param op operation that will be accepted as a union operation |
2256 | */ | 2278 | */ |
2257 | static struct OperationState * | 2279 | static struct OperationState * |
2258 | union_accept(struct Operation *op) | 2280 | union_accept (struct Operation *op) |
2259 | { | 2281 | { |
2260 | struct OperationState *state; | 2282 | struct OperationState *state; |
2261 | const struct StrataEstimator *se; | 2283 | const struct StrataEstimator *se; |
@@ -2265,46 +2287,48 @@ union_accept(struct Operation *op) | |||
2265 | size_t len; | 2287 | size_t len; |
2266 | uint16_t type; | 2288 | uint16_t type; |
2267 | 2289 | ||
2268 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2290 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2269 | "accepting set union operation\n"); | 2291 | "accepting set union operation\n"); |
2270 | GNUNET_STATISTICS_update(_GSS_statistics, | 2292 | GNUNET_STATISTICS_update (_GSS_statistics, |
2271 | "# of accepted union operations", | 2293 | "# of accepted union operations", |
2272 | 1, | 2294 | 1, |
2273 | GNUNET_NO); | 2295 | GNUNET_NO); |
2274 | GNUNET_STATISTICS_update(_GSS_statistics, | 2296 | GNUNET_STATISTICS_update (_GSS_statistics, |
2275 | "# of total union operations", | 2297 | "# of total union operations", |
2276 | 1, | 2298 | 1, |
2277 | GNUNET_NO); | 2299 | GNUNET_NO); |
2278 | 2300 | ||
2279 | state = GNUNET_new(struct OperationState); | 2301 | state = GNUNET_new (struct OperationState); |
2280 | state->se = strata_estimator_dup(op->set->state->se); | 2302 | state->se = strata_estimator_dup (op->set->state->se); |
2281 | state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32, | 2303 | state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, |
2282 | GNUNET_NO); | 2304 | GNUNET_NO); |
2283 | state->salt_receive = state->salt_send = 42; // FIXME????? | 2305 | state->salt_receive = state->salt_send = 42; // FIXME????? |
2284 | op->state = state; | 2306 | op->state = state; |
2285 | initialize_key_to_element(op); | 2307 | initialize_key_to_element (op); |
2286 | state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element); | 2308 | state->initial_size = GNUNET_CONTAINER_multihashmap32_size ( |
2309 | state->key_to_element); | ||
2287 | 2310 | ||
2288 | /* kick off the operation */ | 2311 | /* kick off the operation */ |
2289 | se = state->se; | 2312 | se = state->se; |
2290 | buf = GNUNET_malloc(se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); | 2313 | buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); |
2291 | len = strata_estimator_write(se, | 2314 | len = strata_estimator_write (se, |
2292 | buf); | 2315 | buf); |
2293 | if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) | 2316 | if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) |
2294 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; | 2317 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; |
2295 | else | 2318 | else |
2296 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; | 2319 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; |
2297 | ev = GNUNET_MQ_msg_extra(strata_msg, | 2320 | ev = GNUNET_MQ_msg_extra (strata_msg, |
2298 | len, | 2321 | len, |
2299 | type); | 2322 | type); |
2300 | GNUNET_memcpy(&strata_msg[1], | 2323 | GNUNET_memcpy (&strata_msg[1], |
2301 | buf, | 2324 | buf, |
2302 | len); | 2325 | len); |
2303 | GNUNET_free(buf); | 2326 | GNUNET_free (buf); |
2304 | strata_msg->set_size | 2327 | strata_msg->set_size |
2305 | = GNUNET_htonll(GNUNET_CONTAINER_multihashmap_size(op->set->content->elements)); | 2328 | = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( |
2306 | GNUNET_MQ_send(op->mq, | 2329 | op->set->content->elements)); |
2307 | ev); | 2330 | GNUNET_MQ_send (op->mq, |
2331 | ev); | ||
2308 | state->phase = PHASE_EXPECT_IBF; | 2332 | state->phase = PHASE_EXPECT_IBF; |
2309 | return state; | 2333 | return state; |
2310 | } | 2334 | } |
@@ -2319,22 +2343,22 @@ union_accept(struct Operation *op) | |||
2319 | * @return the newly created set, NULL on error | 2343 | * @return the newly created set, NULL on error |
2320 | */ | 2344 | */ |
2321 | static struct SetState * | 2345 | static struct SetState * |
2322 | union_set_create(void) | 2346 | union_set_create (void) |
2323 | { | 2347 | { |
2324 | struct SetState *set_state; | 2348 | struct SetState *set_state; |
2325 | 2349 | ||
2326 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 2350 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2327 | "union set created\n"); | 2351 | "union set created\n"); |
2328 | set_state = GNUNET_new(struct SetState); | 2352 | set_state = GNUNET_new (struct SetState); |
2329 | set_state->se = strata_estimator_create(SE_STRATA_COUNT, | 2353 | set_state->se = strata_estimator_create (SE_STRATA_COUNT, |
2330 | SE_IBF_SIZE, SE_IBF_HASH_NUM); | 2354 | SE_IBF_SIZE, SE_IBF_HASH_NUM); |
2331 | if (NULL == set_state->se) | 2355 | if (NULL == set_state->se) |
2332 | { | 2356 | { |
2333 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | 2357 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
2334 | "Failed to allocate strata estimator\n"); | 2358 | "Failed to allocate strata estimator\n"); |
2335 | GNUNET_free(set_state); | 2359 | GNUNET_free (set_state); |
2336 | return NULL; | 2360 | return NULL; |
2337 | } | 2361 | } |
2338 | return set_state; | 2362 | return set_state; |
2339 | } | 2363 | } |
2340 | 2364 | ||
@@ -2346,11 +2370,11 @@ union_set_create(void) | |||
2346 | * @param ee the element to add to the set | 2370 | * @param ee the element to add to the set |
2347 | */ | 2371 | */ |
2348 | static void | 2372 | static void |
2349 | union_add(struct SetState *set_state, | 2373 | union_add (struct SetState *set_state, |
2350 | struct ElementEntry *ee) | 2374 | struct ElementEntry *ee) |
2351 | { | 2375 | { |
2352 | strata_estimator_insert(set_state->se, | 2376 | strata_estimator_insert (set_state->se, |
2353 | get_ibf_key(&ee->element_hash)); | 2377 | get_ibf_key (&ee->element_hash)); |
2354 | } | 2378 | } |
2355 | 2379 | ||
2356 | 2380 | ||
@@ -2362,11 +2386,11 @@ union_add(struct SetState *set_state, | |||
2362 | * @param ee set element to remove | 2386 | * @param ee set element to remove |
2363 | */ | 2387 | */ |
2364 | static void | 2388 | static void |
2365 | union_remove(struct SetState *set_state, | 2389 | union_remove (struct SetState *set_state, |
2366 | struct ElementEntry *ee) | 2390 | struct ElementEntry *ee) |
2367 | { | 2391 | { |
2368 | strata_estimator_remove(set_state->se, | 2392 | strata_estimator_remove (set_state->se, |
2369 | get_ibf_key(&ee->element_hash)); | 2393 | get_ibf_key (&ee->element_hash)); |
2370 | } | 2394 | } |
2371 | 2395 | ||
2372 | 2396 | ||
@@ -2376,14 +2400,14 @@ union_remove(struct SetState *set_state, | |||
2376 | * @param set_state the set to destroy | 2400 | * @param set_state the set to destroy |
2377 | */ | 2401 | */ |
2378 | static void | 2402 | static void |
2379 | union_set_destroy(struct SetState *set_state) | 2403 | union_set_destroy (struct SetState *set_state) |
2380 | { | 2404 | { |
2381 | if (NULL != set_state->se) | 2405 | if (NULL != set_state->se) |
2382 | { | 2406 | { |
2383 | strata_estimator_destroy(set_state->se); | 2407 | strata_estimator_destroy (set_state->se); |
2384 | set_state->se = NULL; | 2408 | set_state->se = NULL; |
2385 | } | 2409 | } |
2386 | GNUNET_free(set_state); | 2410 | GNUNET_free (set_state); |
2387 | } | 2411 | } |
2388 | 2412 | ||
2389 | 2413 | ||
@@ -2394,14 +2418,14 @@ union_set_destroy(struct SetState *set_state) | |||
2394 | * @return a copy of the union-specific set state | 2418 | * @return a copy of the union-specific set state |
2395 | */ | 2419 | */ |
2396 | static struct SetState * | 2420 | static struct SetState * |
2397 | union_copy_state(struct SetState *state) | 2421 | union_copy_state (struct SetState *state) |
2398 | { | 2422 | { |
2399 | struct SetState *new_state; | 2423 | struct SetState *new_state; |
2400 | 2424 | ||
2401 | GNUNET_assert((NULL != state) && | 2425 | GNUNET_assert ((NULL != state) && |
2402 | (NULL != state->se)); | 2426 | (NULL != state->se)); |
2403 | new_state = GNUNET_new(struct SetState); | 2427 | new_state = GNUNET_new (struct SetState); |
2404 | new_state->se = strata_estimator_dup(state->se); | 2428 | new_state->se = strata_estimator_dup (state->se); |
2405 | 2429 | ||
2406 | return new_state; | 2430 | return new_state; |
2407 | } | 2431 | } |
@@ -2413,11 +2437,11 @@ union_copy_state(struct SetState *state) | |||
2413 | * @param op operation that lost the channel | 2437 | * @param op operation that lost the channel |
2414 | */ | 2438 | */ |
2415 | static void | 2439 | static void |
2416 | union_channel_death(struct Operation *op) | 2440 | union_channel_death (struct Operation *op) |
2417 | { | 2441 | { |
2418 | send_client_done(op); | 2442 | send_client_done (op); |
2419 | _GSS_operation_destroy(op, | 2443 | _GSS_operation_destroy (op, |
2420 | GNUNET_YES); | 2444 | GNUNET_YES); |
2421 | } | 2445 | } |
2422 | 2446 | ||
2423 | 2447 | ||
@@ -2428,7 +2452,7 @@ union_channel_death(struct Operation *op) | |||
2428 | * @return the operation specific VTable | 2452 | * @return the operation specific VTable |
2429 | */ | 2453 | */ |
2430 | const struct SetVT * | 2454 | const struct SetVT * |
2431 | _GSS_union_vt() | 2455 | _GSS_union_vt () |
2432 | { | 2456 | { |
2433 | static const struct SetVT union_vt = { | 2457 | static const struct SetVT union_vt = { |
2434 | .create = &union_set_create, | 2458 | .create = &union_set_create, |