aboutsummaryrefslogtreecommitdiff
path: root/src/rps/gnunet-service-rps.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/gnunet-service-rps.c')
-rw-r--r--src/rps/gnunet-service-rps.c1814
1 files changed, 1196 insertions, 618 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 07b88ddcd..20b314db3 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -25,6 +25,7 @@
25#include "gnunet_applications.h" 25#include "gnunet_applications.h"
26#include "gnunet_util_lib.h" 26#include "gnunet_util_lib.h"
27#include "gnunet_cadet_service.h" 27#include "gnunet_cadet_service.h"
28#include "gnunet_core_service.h"
28#include "gnunet_peerinfo_service.h" 29#include "gnunet_peerinfo_service.h"
29#include "gnunet_nse_service.h" 30#include "gnunet_nse_service.h"
30#include "gnunet_statistics_service.h" 31#include "gnunet_statistics_service.h"
@@ -36,11 +37,10 @@
36 37
37#include <math.h> 38#include <math.h>
38#include <inttypes.h> 39#include <inttypes.h>
40#include <string.h>
39 41
40#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__) 42#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41 43
42// TODO modify @brief in every file
43
44// TODO check for overflows 44// TODO check for overflows
45 45
46// TODO align message structs 46// TODO align message structs
@@ -149,6 +149,11 @@ struct ChannelCtx;
149struct PeerContext 149struct PeerContext
150{ 150{
151 /** 151 /**
152 * The Sub this context belongs to.
153 */
154 struct Sub *sub;
155
156 /**
152 * Message queue open to client 157 * Message queue open to client
153 */ 158 */
154 struct GNUNET_MQ_Handle *mq; 159 struct GNUNET_MQ_Handle *mq;
@@ -211,6 +216,7 @@ struct PeerContext
211 * it, how did we get its ID, how many pushes (in a timeinterval), 216 * it, how did we get its ID, how many pushes (in a timeinterval),
212 * ...) 217 * ...)
213 */ 218 */
219 uint32_t round_pull_req;
214}; 220};
215 221
216/** 222/**
@@ -275,24 +281,17 @@ struct AttackedPeer
275#endif /* ENABLE_MALICIOUS */ 281#endif /* ENABLE_MALICIOUS */
276 282
277/** 283/**
278 * @brief One SubSampler. 284 * @brief One Sub.
279 * 285 *
280 * Essentially one instance of brahms that only connects to other instances 286 * Essentially one instance of brahms that only connects to other instances
281 * with the same (secret) value. 287 * with the same (secret) value.
282 */ 288 */
283struct SubSampler 289struct Sub
284{ 290{
285 /** 291 /**
286 * @brief Port used for cadet. 292 * @brief Hash of the shared value that defines Subs.
287 *
288 * Don't compute multiple times through making it global
289 */ 293 */
290 struct GNUNET_HashCode port; 294 struct GNUNET_HashCode hash;
291
292 /**
293 * Handler to CADET.
294 */
295 struct GNUNET_CADET_Handle *cadet_handle;
296 295
297 /** 296 /**
298 * @brief Port to communicate to other peers. 297 * @brief Port to communicate to other peers.
@@ -356,6 +355,16 @@ struct SubSampler
356 uint32_t num_observed_peers; 355 uint32_t num_observed_peers;
357 356
358 /** 357 /**
358 * @brief File name to log number of pushes per round to
359 */
360 char *file_name_push_recv;
361
362 /**
363 * @brief File name to log number of pushes per round to
364 */
365 char *file_name_pull_delays;
366
367 /**
359 * @brief Multipeermap (ab-) used to count unique peer_ids 368 * @brief Multipeermap (ab-) used to count unique peer_ids
360 */ 369 */
361 struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers; 370 struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
@@ -394,6 +403,28 @@ struct SubSampler
394 * Identifier for the main task that runs periodically. 403 * Identifier for the main task that runs periodically.
395 */ 404 */
396 struct GNUNET_SCHEDULER_Task *do_round_task; 405 struct GNUNET_SCHEDULER_Task *do_round_task;
406
407 /* === stats === */
408
409 /**
410 * @brief Counts the executed rounds.
411 */
412 uint32_t num_rounds;
413
414 /**
415 * @brief This array accumulates the number of received pushes per round.
416 *
417 * Number at index i represents the number of rounds with i observed pushes.
418 */
419 uint32_t push_recv[256];
420
421 /**
422 * @brief Number of pull replies with this delay measured in rounds.
423 *
424 * Number at index i represents the number of pull replies with a delay of i
425 * rounds.
426 */
427 uint32_t pull_delays[256];
397}; 428};
398 429
399 430
@@ -412,6 +443,21 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
412struct GNUNET_STATISTICS_Handle *stats; 443struct GNUNET_STATISTICS_Handle *stats;
413 444
414/** 445/**
446 * Handler to CADET.
447 */
448struct GNUNET_CADET_Handle *cadet_handle;
449
450/**
451 * Handle to CORE
452 */
453struct GNUNET_CORE_Handle *core_handle;
454
455/**
456 * @brief PeerMap to keep track of connected peers.
457 */
458struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
459
460/**
415 * Our own identity. 461 * Our own identity.
416 */ 462 */
417static struct GNUNET_PeerIdentity own_identity; 463static struct GNUNET_PeerIdentity own_identity;
@@ -511,12 +557,12 @@ static uint32_t push_limit = 10000;
511#endif /* ENABLE_MALICIOUS */ 557#endif /* ENABLE_MALICIOUS */
512 558
513/** 559/**
514 * @brief Main SubSampler. 560 * @brief Main Sub.
515 * 561 *
516 * This is run in any case by all peers and connects to all peers without 562 * This is run in any case by all peers and connects to all peers without
517 * specifying a shared value. 563 * specifying a shared value.
518 */ 564 */
519static struct SubSampler *mss; 565static struct Sub *msub;
520 566
521/** 567/**
522 * @brief Maximum number of valid peers to keep. 568 * @brief Maximum number of valid peers to keep.
@@ -524,28 +570,36 @@ static struct SubSampler *mss;
524 */ 570 */
525static const uint32_t num_valid_peers_max = UINT32_MAX; 571static const uint32_t num_valid_peers_max = UINT32_MAX;
526 572
527
528/*********************************************************************** 573/***********************************************************************
529 * /Globals 574 * /Globals
530***********************************************************************/ 575***********************************************************************/
531 576
532 577
578static void
579do_round (void *cls);
580
581static void
582do_mal_round (void *cls);
583
584
533/** 585/**
534 * @brief Get the #PeerContext associated with a peer 586 * @brief Get the #PeerContext associated with a peer
535 * 587 *
588 * @param peer_map The peer map containing the context
536 * @param peer the peer id 589 * @param peer the peer id
537 * 590 *
538 * @return the #PeerContext 591 * @return the #PeerContext
539 */ 592 */
540static struct PeerContext * 593static struct PeerContext *
541get_peer_ctx (const struct GNUNET_PeerIdentity *peer) 594get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
595 const struct GNUNET_PeerIdentity *peer)
542{ 596{
543 struct PeerContext *ctx; 597 struct PeerContext *ctx;
544 int ret; 598 int ret;
545 599
546 ret = GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer); 600 ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
547 GNUNET_assert (GNUNET_YES == ret); 601 GNUNET_assert (GNUNET_YES == ret);
548 ctx = GNUNET_CONTAINER_multipeermap_get (mss->peer_map, peer); 602 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
549 GNUNET_assert (NULL != ctx); 603 GNUNET_assert (NULL != ctx);
550 return ctx; 604 return ctx;
551} 605}
@@ -555,18 +609,21 @@ get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
555 * 609 *
556 * FIXME probably deprecated. Make this the new _online. 610 * FIXME probably deprecated. Make this the new _online.
557 * 611 *
612 * @param peer_map The peer map to check for the existence of @a peer
558 * @param peer peer in question 613 * @param peer peer in question
559 * 614 *
560 * @return #GNUNET_YES if peer is known 615 * @return #GNUNET_YES if peer is known
561 * #GNUNET_NO if peer is not knwon 616 * #GNUNET_NO if peer is not knwon
562 */ 617 */
563static int 618static int
564check_peer_known (const struct GNUNET_PeerIdentity *peer) 619check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
620 const struct GNUNET_PeerIdentity *peer)
565{ 621{
566 if (NULL != mss->peer_map) 622 if (NULL != peer_map)
567 { 623 {
568 return GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer); 624 return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
569 } else 625 }
626 else
570 { 627 {
571 return GNUNET_NO; 628 return GNUNET_NO;
572 } 629 }
@@ -576,27 +633,33 @@ check_peer_known (const struct GNUNET_PeerIdentity *peer)
576/** 633/**
577 * @brief Create a new #PeerContext and insert it into the peer map 634 * @brief Create a new #PeerContext and insert it into the peer map
578 * 635 *
636 * @param sub The Sub this context belongs to.
579 * @param peer the peer to create the #PeerContext for 637 * @param peer the peer to create the #PeerContext for
580 * 638 *
581 * @return the #PeerContext 639 * @return the #PeerContext
582 */ 640 */
583static struct PeerContext * 641static struct PeerContext *
584create_peer_ctx (const struct GNUNET_PeerIdentity *peer) 642create_peer_ctx (struct Sub *sub,
643 const struct GNUNET_PeerIdentity *peer)
585{ 644{
586 struct PeerContext *ctx; 645 struct PeerContext *ctx;
587 int ret; 646 int ret;
588 647
589 GNUNET_assert (GNUNET_NO == check_peer_known (peer)); 648 GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
590 649
591 ctx = GNUNET_new (struct PeerContext); 650 ctx = GNUNET_new (struct PeerContext);
592 ctx->peer_id = *peer; 651 ctx->peer_id = *peer;
593 ret = GNUNET_CONTAINER_multipeermap_put (mss->peer_map, peer, ctx, 652 ctx->sub = sub;
653 ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
594 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 654 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
595 GNUNET_assert (GNUNET_OK == ret); 655 GNUNET_assert (GNUNET_OK == ret);
596 GNUNET_STATISTICS_set (stats, 656 if (sub == msub)
597 "# known peers", 657 {
598 GNUNET_CONTAINER_multipeermap_size (mss->peer_map), 658 GNUNET_STATISTICS_set (stats,
599 GNUNET_NO); 659 "# known peers",
660 GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
661 GNUNET_NO);
662 }
600 return ctx; 663 return ctx;
601} 664}
602 665
@@ -604,18 +667,20 @@ create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
604/** 667/**
605 * @brief Create or get a #PeerContext 668 * @brief Create or get a #PeerContext
606 * 669 *
670 * @param sub The Sub to which the created context belongs to
607 * @param peer the peer to get the associated context to 671 * @param peer the peer to get the associated context to
608 * 672 *
609 * @return the context 673 * @return the context
610 */ 674 */
611static struct PeerContext * 675static struct PeerContext *
612create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer) 676create_or_get_peer_ctx (struct Sub *sub,
677 const struct GNUNET_PeerIdentity *peer)
613{ 678{
614 if (GNUNET_NO == check_peer_known (peer)) 679 if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
615 { 680 {
616 return create_peer_ctx (peer); 681 return create_peer_ctx (sub, peer);
617 } 682 }
618 return get_peer_ctx (peer); 683 return get_peer_ctx (sub->peer_map, peer);
619} 684}
620 685
621 686
@@ -624,23 +689,22 @@ create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
624 * 689 *
625 * Also sets the #Peers_ONLINE flag accordingly 690 * Also sets the #Peers_ONLINE flag accordingly
626 * 691 *
627 * @param peer the peer in question 692 * @param peer_ctx Context of the peer of which connectivity is to be checked
628 * 693 *
629 * @return #GNUNET_YES if we are connected 694 * @return #GNUNET_YES if we are connected
630 * #GNUNET_NO otherwise 695 * #GNUNET_NO otherwise
631 */ 696 */
632static int 697static int
633check_connected (const struct GNUNET_PeerIdentity *peer) 698check_connected (struct PeerContext *peer_ctx)
634{ 699{
635 struct PeerContext *peer_ctx;
636
637 /* If we don't know about this peer we don't know whether it's online */ 700 /* If we don't know about this peer we don't know whether it's online */
638 if (GNUNET_NO == check_peer_known (peer)) 701 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
702 &peer_ctx->peer_id))
639 { 703 {
640 return GNUNET_NO; 704 return GNUNET_NO;
641 } 705 }
642 /* Get the context */ 706 /* Get the context */
643 peer_ctx = get_peer_ctx (peer); 707 peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
644 /* If we have no channel to this peer we don't know whether it's online */ 708 /* If we have no channel to this peer we don't know whether it's online */
645 if ( (NULL == peer_ctx->send_channel_ctx) && 709 if ( (NULL == peer_ctx->send_channel_ctx) &&
646 (NULL == peer_ctx->recv_channel_ctx) ) 710 (NULL == peer_ctx->recv_channel_ctx) )
@@ -709,21 +773,21 @@ get_rand_peer_iterator (void *cls,
709/** 773/**
710 * @brief Get a random peer from @a peer_map 774 * @brief Get a random peer from @a peer_map
711 * 775 *
712 * @param peer_map the peer_map to get the peer from 776 * @param valid_peers Peer map containing valid peers from which to select a
777 * random one
713 * 778 *
714 * @return a random peer 779 * @return a random peer
715 */ 780 */
716static const struct GNUNET_PeerIdentity * 781static const struct GNUNET_PeerIdentity *
717get_random_peer_from_peermap (const struct 782get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
718 GNUNET_CONTAINER_MultiPeerMap *peer_map)
719{ 783{
720 struct GetRandPeerIteratorCls *iterator_cls; 784 struct GetRandPeerIteratorCls *iterator_cls;
721 const struct GNUNET_PeerIdentity *ret; 785 const struct GNUNET_PeerIdentity *ret;
722 786
723 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls); 787 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
724 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 788 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
725 GNUNET_CONTAINER_multipeermap_size (peer_map)); 789 GNUNET_CONTAINER_multipeermap_size (valid_peers));
726 (void) GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 790 (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
727 get_rand_peer_iterator, 791 get_rand_peer_iterator,
728 iterator_cls); 792 iterator_cls);
729 ret = iterator_cls->peer; 793 ret = iterator_cls->peer;
@@ -737,31 +801,37 @@ get_random_peer_from_peermap (const struct
737 * 801 *
738 * If valid peers are already #num_valid_peers_max, delete a peer previously. 802 * If valid peers are already #num_valid_peers_max, delete a peer previously.
739 * 803 *
740 * @param peer the peer that is added to the valid peers. 804 * @param peer The peer that is added to the valid peers.
805 * @param valid_peers Peer map of valid peers to which to add the @a peer
741 * 806 *
742 * @return #GNUNET_YES if no other peer had to be removed 807 * @return #GNUNET_YES if no other peer had to be removed
743 * #GNUNET_NO otherwise 808 * #GNUNET_NO otherwise
744 */ 809 */
745static int 810static int
746add_valid_peer (const struct GNUNET_PeerIdentity *peer) 811add_valid_peer (const struct GNUNET_PeerIdentity *peer,
812 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
747{ 813{
748 const struct GNUNET_PeerIdentity *rand_peer; 814 const struct GNUNET_PeerIdentity *rand_peer;
749 int ret; 815 int ret;
750 816
751 ret = GNUNET_YES; 817 ret = GNUNET_YES;
752 while (GNUNET_CONTAINER_multipeermap_size ( 818 /* Remove random peers until there is space for a new one */
753 mss->valid_peers) >= num_valid_peers_max) 819 while (num_valid_peers_max <=
820 GNUNET_CONTAINER_multipeermap_size (valid_peers))
754 { 821 {
755 rand_peer = get_random_peer_from_peermap (mss->valid_peers); 822 rand_peer = get_random_peer_from_peermap (valid_peers);
756 GNUNET_CONTAINER_multipeermap_remove_all (mss->valid_peers, rand_peer); 823 GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
757 ret = GNUNET_NO; 824 ret = GNUNET_NO;
758 } 825 }
759 (void) GNUNET_CONTAINER_multipeermap_put (mss->valid_peers, peer, NULL, 826 (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
760 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 827 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
761 GNUNET_STATISTICS_set (stats, 828 if (valid_peers == msub->valid_peers)
762 "# valid peers", 829 {
763 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers), 830 GNUNET_STATISTICS_set (stats,
764 GNUNET_NO); 831 "# valid peers",
832 GNUNET_CONTAINER_multipeermap_size (valid_peers),
833 GNUNET_NO);
834 }
765 return ret; 835 return ret;
766} 836}
767 837
@@ -799,7 +869,6 @@ set_peer_online (struct PeerContext *peer_ctx)
799 peer_ctx->online_check_pending = NULL; 869 peer_ctx->online_check_pending = NULL;
800 } 870 }
801 871
802 (void) add_valid_peer (peer);
803 SET_PEER_FLAG (peer_ctx, Peers_ONLINE); 872 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
804 873
805 /* Call pending operations */ 874 /* Call pending operations */
@@ -888,14 +957,12 @@ remove_channel_ctx (struct ChannelCtx *channel_ctx)
888/** 957/**
889 * @brief Get the channel of a peer. If not existing, create. 958 * @brief Get the channel of a peer. If not existing, create.
890 * 959 *
891 * @param peer the peer id 960 * @param peer_ctx Context of the peer of which to get the channel
892 * @return the #GNUNET_CADET_Channel used to send data to @a peer 961 * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
893 */ 962 */
894struct GNUNET_CADET_Channel * 963struct GNUNET_CADET_Channel *
895get_channel (const struct GNUNET_PeerIdentity *peer) 964get_channel (struct PeerContext *peer_ctx)
896{ 965{
897 struct PeerContext *peer_ctx;
898 struct GNUNET_PeerIdentity *ctx_peer;
899 /* There exists a copy-paste-clone in run() */ 966 /* There exists a copy-paste-clone in run() */
900 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 967 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
901 GNUNET_MQ_hd_fixed_size (peer_check, 968 GNUNET_MQ_hd_fixed_size (peer_check,
@@ -918,20 +985,17 @@ get_channel (const struct GNUNET_PeerIdentity *peer)
918 }; 985 };
919 986
920 987
921 peer_ctx = get_peer_ctx (peer);
922 if (NULL == peer_ctx->send_channel_ctx) 988 if (NULL == peer_ctx->send_channel_ctx)
923 { 989 {
924 LOG (GNUNET_ERROR_TYPE_DEBUG, 990 LOG (GNUNET_ERROR_TYPE_DEBUG,
925 "Trying to establish channel to peer %s\n", 991 "Trying to establish channel to peer %s\n",
926 GNUNET_i2s (peer)); 992 GNUNET_i2s (&peer_ctx->peer_id));
927 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
928 *ctx_peer = *peer;
929 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx); 993 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
930 peer_ctx->send_channel_ctx->channel = 994 peer_ctx->send_channel_ctx->channel =
931 GNUNET_CADET_channel_create (mss->cadet_handle, 995 GNUNET_CADET_channel_create (cadet_handle,
932 peer_ctx->send_channel_ctx, /* context */ 996 peer_ctx->send_channel_ctx, /* context */
933 peer, 997 &peer_ctx->peer_id,
934 &mss->port, 998 &peer_ctx->sub->hash,
935 GNUNET_CADET_OPTION_RELIABLE, 999 GNUNET_CADET_OPTION_RELIABLE,
936 NULL, /* WindowSize handler */ 1000 NULL, /* WindowSize handler */
937 &cleanup_destroyed_channel, /* Disconnect handler */ 1001 &cleanup_destroyed_channel, /* Disconnect handler */
@@ -949,19 +1013,15 @@ get_channel (const struct GNUNET_PeerIdentity *peer)
949 * If we already have a message queue open to this client, 1013 * If we already have a message queue open to this client,
950 * simply return it, otherways create one. 1014 * simply return it, otherways create one.
951 * 1015 *
952 * @param peer the peer to get the mq to 1016 * @param peer_ctx Context of the peer of whicht to get the mq
953 * @return the #GNUNET_MQ_Handle 1017 * @return the #GNUNET_MQ_Handle
954 */ 1018 */
955static struct GNUNET_MQ_Handle * 1019static struct GNUNET_MQ_Handle *
956get_mq (const struct GNUNET_PeerIdentity *peer) 1020get_mq (struct PeerContext *peer_ctx)
957{ 1021{
958 struct PeerContext *peer_ctx;
959
960 peer_ctx = get_peer_ctx (peer);
961
962 if (NULL == peer_ctx->mq) 1022 if (NULL == peer_ctx->mq)
963 { 1023 {
964 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer)); 1024 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
965 } 1025 }
966 return peer_ctx->mq; 1026 return peer_ctx->mq;
967} 1027}
@@ -969,20 +1029,18 @@ get_mq (const struct GNUNET_PeerIdentity *peer)
969/** 1029/**
970 * @brief Add an envelope to a message passed to mq to list of pending messages 1030 * @brief Add an envelope to a message passed to mq to list of pending messages
971 * 1031 *
972 * @param peer peer the message was sent to 1032 * @param peer_ctx Context of the peer for which to insert the envelope
973 * @param ev envelope to the message 1033 * @param ev envelope to the message
974 * @param type type of the message to be sent 1034 * @param type type of the message to be sent
975 * @return pointer to pending message 1035 * @return pointer to pending message
976 */ 1036 */
977static struct PendingMessage * 1037static struct PendingMessage *
978insert_pending_message (const struct GNUNET_PeerIdentity *peer, 1038insert_pending_message (struct PeerContext *peer_ctx,
979 struct GNUNET_MQ_Envelope *ev, 1039 struct GNUNET_MQ_Envelope *ev,
980 const char *type) 1040 const char *type)
981{ 1041{
982 struct PendingMessage *pending_msg; 1042 struct PendingMessage *pending_msg;
983 struct PeerContext *peer_ctx;
984 1043
985 peer_ctx = get_peer_ctx (peer);
986 pending_msg = GNUNET_new (struct PendingMessage); 1044 pending_msg = GNUNET_new (struct PendingMessage);
987 pending_msg->ev = ev; 1045 pending_msg->ev = ev;
988 pending_msg->peer_ctx = peer_ctx; 1046 pending_msg->peer_ctx = peer_ctx;
@@ -1039,6 +1097,7 @@ mq_online_check_successful (void *cls)
1039 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES); 1097 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1040 peer_ctx->online_check_pending = NULL; 1098 peer_ctx->online_check_pending = NULL;
1041 set_peer_online (peer_ctx); 1099 set_peer_online (peer_ctx);
1100 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1042 } 1101 }
1043} 1102}
1044 1103
@@ -1059,16 +1118,19 @@ check_peer_online (struct PeerContext *peer_ctx)
1059 1118
1060 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE); 1119 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1061 peer_ctx->online_check_pending = 1120 peer_ctx->online_check_pending =
1062 insert_pending_message (&peer_ctx->peer_id, ev, "Check online"); 1121 insert_pending_message (peer_ctx, ev, "Check online");
1063 mq = get_mq (&peer_ctx->peer_id); 1122 mq = get_mq (peer_ctx);
1064 GNUNET_MQ_notify_sent (ev, 1123 GNUNET_MQ_notify_sent (ev,
1065 mq_online_check_successful, 1124 mq_online_check_successful,
1066 peer_ctx); 1125 peer_ctx);
1067 GNUNET_MQ_send (mq, ev); 1126 GNUNET_MQ_send (mq, ev);
1068 GNUNET_STATISTICS_update (stats, 1127 if (peer_ctx->sub == msub)
1069 "# pending online checks", 1128 {
1070 1, 1129 GNUNET_STATISTICS_update (stats,
1071 GNUNET_NO); 1130 "# pending online checks",
1131 1,
1132 GNUNET_NO);
1133 }
1072} 1134}
1073 1135
1074 1136
@@ -1078,20 +1140,18 @@ check_peer_online (struct PeerContext *peer_ctx)
1078 * The array with pending operations will probably never grow really big, so 1140 * The array with pending operations will probably never grow really big, so
1079 * iterating over it should be ok. 1141 * iterating over it should be ok.
1080 * 1142 *
1081 * @param peer the peer to check 1143 * @param peer_ctx Context of the peer to check for the operation
1082 * @param peer_op the operation (#PeerOp) on the peer 1144 * @param peer_op the operation (#PeerOp) on the peer
1083 * 1145 *
1084 * @return #GNUNET_YES if this operation is scheduled on that peer 1146 * @return #GNUNET_YES if this operation is scheduled on that peer
1085 * #GNUNET_NO otherwise 1147 * #GNUNET_NO otherwise
1086 */ 1148 */
1087static int 1149static int
1088check_operation_scheduled (const struct GNUNET_PeerIdentity *peer, 1150check_operation_scheduled (const struct PeerContext *peer_ctx,
1089 const PeerOp peer_op) 1151 const PeerOp peer_op)
1090{ 1152{
1091 const struct PeerContext *peer_ctx;
1092 unsigned int i; 1153 unsigned int i;
1093 1154
1094 peer_ctx = get_peer_ctx (peer);
1095 for (i = 0; i < peer_ctx->num_pending_ops; i++) 1155 for (i = 0; i < peer_ctx->num_pending_ops; i++)
1096 if (peer_op == peer_ctx->pending_ops[i].op) 1156 if (peer_op == peer_ctx->pending_ops[i].op)
1097 return GNUNET_YES; 1157 return GNUNET_YES;
@@ -1165,7 +1225,13 @@ schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1165/** 1225/**
1166 * @brief Remove peer 1226 * @brief Remove peer
1167 * 1227 *
1168 * @param peer the peer to clean 1228 * - Empties the list with pending operations
1229 * - Empties the list with pending messages
1230 * - Cancels potentially existing online check
1231 * - Schedules closing of send and recv channels
1232 * - Removes peer from peer map
1233 *
1234 * @param peer_ctx Context of the peer to be destroyed
1169 * @return #GNUNET_YES if peer was removed 1235 * @return #GNUNET_YES if peer was removed
1170 * #GNUNET_NO otherwise 1236 * #GNUNET_NO otherwise
1171 */ 1237 */
@@ -1173,9 +1239,9 @@ static int
1173destroy_peer (struct PeerContext *peer_ctx) 1239destroy_peer (struct PeerContext *peer_ctx)
1174{ 1240{
1175 GNUNET_assert (NULL != peer_ctx); 1241 GNUNET_assert (NULL != peer_ctx);
1176 GNUNET_assert (NULL != mss->peer_map); 1242 GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1177 if (GNUNET_NO == 1243 if (GNUNET_NO ==
1178 GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, 1244 GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1179 &peer_ctx->peer_id)) 1245 &peer_ctx->peer_id))
1180 { 1246 {
1181 return GNUNET_NO; 1247 return GNUNET_NO;
@@ -1205,10 +1271,13 @@ destroy_peer (struct PeerContext *peer_ctx)
1205 sizeof (struct PendingMessage))) ) 1271 sizeof (struct PendingMessage))) )
1206 { 1272 {
1207 peer_ctx->online_check_pending = NULL; 1273 peer_ctx->online_check_pending = NULL;
1208 GNUNET_STATISTICS_update (stats, 1274 if (peer_ctx->sub == msub)
1209 "# pending online checks", 1275 {
1210 -1, 1276 GNUNET_STATISTICS_update (stats,
1211 GNUNET_NO); 1277 "# pending online checks",
1278 -1,
1279 GNUNET_NO);
1280 }
1212 } 1281 }
1213 remove_pending_message (peer_ctx->pending_messages_head, 1282 remove_pending_message (peer_ctx->pending_messages_head,
1214 GNUNET_YES); 1283 GNUNET_YES);
@@ -1245,16 +1314,19 @@ destroy_peer (struct PeerContext *peer_ctx)
1245 } 1314 }
1246 1315
1247 if (GNUNET_YES != 1316 if (GNUNET_YES !=
1248 GNUNET_CONTAINER_multipeermap_remove_all (mss->peer_map, 1317 GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1249 &peer_ctx->peer_id)) 1318 &peer_ctx->peer_id))
1250 { 1319 {
1251 LOG (GNUNET_ERROR_TYPE_WARNING, 1320 LOG (GNUNET_ERROR_TYPE_WARNING,
1252 "removing peer from mss->peer_map failed\n"); 1321 "removing peer from peer_ctx->sub->peer_map failed\n");
1322 }
1323 if (peer_ctx->sub == msub)
1324 {
1325 GNUNET_STATISTICS_set (stats,
1326 "# known peers",
1327 GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1328 GNUNET_NO);
1253 } 1329 }
1254 GNUNET_STATISTICS_set (stats,
1255 "# known peers",
1256 GNUNET_CONTAINER_multipeermap_size (mss->peer_map),
1257 GNUNET_NO);
1258 GNUNET_free (peer_ctx); 1330 GNUNET_free (peer_ctx);
1259 return GNUNET_YES; 1331 return GNUNET_YES;
1260} 1332}
@@ -1274,9 +1346,10 @@ peermap_clear_iterator (void *cls,
1274 const struct GNUNET_PeerIdentity *key, 1346 const struct GNUNET_PeerIdentity *key,
1275 void *value) 1347 void *value)
1276{ 1348{
1277 (void) cls; 1349 struct Sub *sub = cls;
1278 (void) value; 1350 (void) value;
1279 destroy_peer (get_peer_ctx (key)); 1351
1352 destroy_peer (get_peer_ctx (sub->peer_map, key));
1280 return GNUNET_YES; 1353 return GNUNET_YES;
1281} 1354}
1282 1355
@@ -1295,12 +1368,22 @@ mq_notify_sent_cb (void *cls)
1295 LOG (GNUNET_ERROR_TYPE_DEBUG, 1368 LOG (GNUNET_ERROR_TYPE_DEBUG,
1296 "%s was sent.\n", 1369 "%s was sent.\n",
1297 pending_msg->type); 1370 pending_msg->type);
1298 if (0 == strncmp ("PULL REPLY", pending_msg->type, 10)) 1371 if (pending_msg->peer_ctx->sub == msub)
1299 GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO); 1372 {
1300 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12)) 1373 if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1301 GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO); 1374 GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1302 if (0 == strncmp ("PUSH", pending_msg->type, 4)) 1375 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1303 GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO); 1376 GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1377 if (0 == strncmp ("PUSH", pending_msg->type, 4))
1378 GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1379 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1380 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1381 &pending_msg->peer_ctx->peer_id))
1382 GNUNET_STATISTICS_update(stats,
1383 "# pull requests sent (multi-hop peer)",
1384 1,
1385 GNUNET_NO);
1386 }
1304 /* Do not cancle message */ 1387 /* Do not cancle message */
1305 remove_pending_message (pending_msg, GNUNET_NO); 1388 remove_pending_message (pending_msg, GNUNET_NO);
1306} 1389}
@@ -1350,35 +1433,37 @@ store_peer_presistently_iterator (void *cls,
1350 1433
1351/** 1434/**
1352 * @brief Store the peers currently in #valid_peers to disk. 1435 * @brief Store the peers currently in #valid_peers to disk.
1436 *
1437 * @param sub Sub for which to store the valid peers
1353 */ 1438 */
1354static void 1439static void
1355store_valid_peers () 1440store_valid_peers (const struct Sub *sub)
1356{ 1441{
1357 struct GNUNET_DISK_FileHandle *fh; 1442 struct GNUNET_DISK_FileHandle *fh;
1358 uint32_t number_written_peers; 1443 uint32_t number_written_peers;
1359 int ret; 1444 int ret;
1360 1445
1361 if (0 == strncmp ("DISABLE", mss->filename_valid_peers, 7)) 1446 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1362 { 1447 {
1363 return; 1448 return;
1364 } 1449 }
1365 1450
1366 ret = GNUNET_DISK_directory_create_for_file (mss->filename_valid_peers); 1451 ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1367 if (GNUNET_SYSERR == ret) 1452 if (GNUNET_SYSERR == ret)
1368 { 1453 {
1369 LOG (GNUNET_ERROR_TYPE_WARNING, 1454 LOG (GNUNET_ERROR_TYPE_WARNING,
1370 "Not able to create directory for file `%s'\n", 1455 "Not able to create directory for file `%s'\n",
1371 mss->filename_valid_peers); 1456 sub->filename_valid_peers);
1372 GNUNET_break (0); 1457 GNUNET_break (0);
1373 } 1458 }
1374 else if (GNUNET_NO == ret) 1459 else if (GNUNET_NO == ret)
1375 { 1460 {
1376 LOG (GNUNET_ERROR_TYPE_WARNING, 1461 LOG (GNUNET_ERROR_TYPE_WARNING,
1377 "Directory for file `%s' exists but is not writable for us\n", 1462 "Directory for file `%s' exists but is not writable for us\n",
1378 mss->filename_valid_peers); 1463 sub->filename_valid_peers);
1379 GNUNET_break (0); 1464 GNUNET_break (0);
1380 } 1465 }
1381 fh = GNUNET_DISK_file_open (mss->filename_valid_peers, 1466 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1382 GNUNET_DISK_OPEN_WRITE | 1467 GNUNET_DISK_OPEN_WRITE |
1383 GNUNET_DISK_OPEN_CREATE, 1468 GNUNET_DISK_OPEN_CREATE,
1384 GNUNET_DISK_PERM_USER_READ | 1469 GNUNET_DISK_PERM_USER_READ |
@@ -1387,19 +1472,19 @@ store_valid_peers ()
1387 { 1472 {
1388 LOG (GNUNET_ERROR_TYPE_WARNING, 1473 LOG (GNUNET_ERROR_TYPE_WARNING,
1389 "Not able to write valid peers to file `%s'\n", 1474 "Not able to write valid peers to file `%s'\n",
1390 mss->filename_valid_peers); 1475 sub->filename_valid_peers);
1391 return; 1476 return;
1392 } 1477 }
1393 LOG (GNUNET_ERROR_TYPE_DEBUG, 1478 LOG (GNUNET_ERROR_TYPE_DEBUG,
1394 "Writing %u valid peers to disk\n", 1479 "Writing %u valid peers to disk\n",
1395 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1480 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1396 number_written_peers = 1481 number_written_peers =
1397 GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 1482 GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1398 store_peer_presistently_iterator, 1483 store_peer_presistently_iterator,
1399 fh); 1484 fh);
1400 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1485 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1401 GNUNET_assert (number_written_peers == 1486 GNUNET_assert (number_written_peers ==
1402 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1487 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1403} 1488}
1404 1489
1405 1490
@@ -1451,9 +1536,11 @@ s2i_full (const char *string_repr)
1451 1536
1452/** 1537/**
1453 * @brief Restore the peers on disk to #valid_peers. 1538 * @brief Restore the peers on disk to #valid_peers.
1539 *
1540 * @param sub Sub for which to restore the valid peers
1454 */ 1541 */
1455static void 1542static void
1456restore_valid_peers () 1543restore_valid_peers (const struct Sub *sub)
1457{ 1544{
1458 off_t file_size; 1545 off_t file_size;
1459 uint32_t num_peers; 1546 uint32_t num_peers;
@@ -1464,16 +1551,16 @@ restore_valid_peers ()
1464 char *str_repr; 1551 char *str_repr;
1465 const struct GNUNET_PeerIdentity *peer; 1552 const struct GNUNET_PeerIdentity *peer;
1466 1553
1467 if (0 == strncmp ("DISABLE", mss->filename_valid_peers, 7)) 1554 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1468 { 1555 {
1469 return; 1556 return;
1470 } 1557 }
1471 1558
1472 if (GNUNET_OK != GNUNET_DISK_file_test (mss->filename_valid_peers)) 1559 if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1473 { 1560 {
1474 return; 1561 return;
1475 } 1562 }
1476 fh = GNUNET_DISK_file_open (mss->filename_valid_peers, 1563 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1477 GNUNET_DISK_OPEN_READ, 1564 GNUNET_DISK_OPEN_READ,
1478 GNUNET_DISK_PERM_NONE); 1565 GNUNET_DISK_PERM_NONE);
1479 GNUNET_assert (NULL != fh); 1566 GNUNET_assert (NULL != fh);
@@ -1485,13 +1572,13 @@ restore_valid_peers ()
1485 LOG (GNUNET_ERROR_TYPE_DEBUG, 1572 LOG (GNUNET_ERROR_TYPE_DEBUG,
1486 "Restoring %" PRIu32 " peers from file `%s'\n", 1573 "Restoring %" PRIu32 " peers from file `%s'\n",
1487 num_peers, 1574 num_peers,
1488 mss->filename_valid_peers); 1575 sub->filename_valid_peers);
1489 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53) 1576 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1490 { 1577 {
1491 str_repr = GNUNET_strndup (iter_buf, 53); 1578 str_repr = GNUNET_strndup (iter_buf, 53);
1492 peer = s2i_full (str_repr); 1579 peer = s2i_full (str_repr);
1493 GNUNET_free (str_repr); 1580 GNUNET_free (str_repr);
1494 add_valid_peer (peer); 1581 add_valid_peer (peer, sub->valid_peers);
1495 LOG (GNUNET_ERROR_TYPE_DEBUG, 1582 LOG (GNUNET_ERROR_TYPE_DEBUG,
1496 "Restored valid peer %s from disk\n", 1583 "Restored valid peer %s from disk\n",
1497 GNUNET_i2s_full (peer)); 1584 GNUNET_i2s_full (peer));
@@ -1499,10 +1586,10 @@ restore_valid_peers ()
1499 iter_buf = NULL; 1586 iter_buf = NULL;
1500 GNUNET_free (buf); 1587 GNUNET_free (buf);
1501 LOG (GNUNET_ERROR_TYPE_DEBUG, 1588 LOG (GNUNET_ERROR_TYPE_DEBUG,
1502 "num_peers: %" PRIu32 ", _size (mss->valid_peers): %u\n", 1589 "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1503 num_peers, 1590 num_peers,
1504 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1591 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1505 if (num_peers != GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)) 1592 if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1506 { 1593 {
1507 LOG (GNUNET_ERROR_TYPE_WARNING, 1594 LOG (GNUNET_ERROR_TYPE_WARNING,
1508 "Number of restored peers does not match file size. Have probably duplicates.\n"); 1595 "Number of restored peers does not match file size. Have probably duplicates.\n");
@@ -1510,38 +1597,40 @@ restore_valid_peers ()
1510 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1597 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1511 LOG (GNUNET_ERROR_TYPE_DEBUG, 1598 LOG (GNUNET_ERROR_TYPE_DEBUG,
1512 "Restored %u valid peers from disk\n", 1599 "Restored %u valid peers from disk\n",
1513 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1600 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1514} 1601}
1515 1602
1516 1603
1517/** 1604/**
1518 * @brief Delete storage of peers that was created with #initialise_peers () 1605 * @brief Delete storage of peers that was created with #initialise_peers ()
1606 *
1607 * @param sub Sub for which the storage is deleted
1519 */ 1608 */
1520static void 1609static void
1521peers_terminate () 1610peers_terminate (struct Sub *sub)
1522{ 1611{
1523 if (GNUNET_SYSERR == 1612 if (GNUNET_SYSERR ==
1524 GNUNET_CONTAINER_multipeermap_iterate (mss->peer_map, 1613 GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1525 &peermap_clear_iterator, 1614 &peermap_clear_iterator,
1526 NULL)) 1615 sub))
1527 { 1616 {
1528 LOG (GNUNET_ERROR_TYPE_WARNING, 1617 LOG (GNUNET_ERROR_TYPE_WARNING,
1529 "Iteration destroying peers was aborted.\n"); 1618 "Iteration destroying peers was aborted.\n");
1530 } 1619 }
1531 GNUNET_CONTAINER_multipeermap_destroy (mss->peer_map); 1620 GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1532 mss->peer_map = NULL; 1621 sub->peer_map = NULL;
1533 store_valid_peers (); 1622 store_valid_peers (sub);
1534 GNUNET_free (mss->filename_valid_peers); 1623 GNUNET_free (sub->filename_valid_peers);
1535 mss->filename_valid_peers = NULL; 1624 sub->filename_valid_peers = NULL;
1536 GNUNET_CONTAINER_multipeermap_destroy (mss->valid_peers); 1625 GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1537 mss->valid_peers = NULL; 1626 sub->valid_peers = NULL;
1538} 1627}
1539 1628
1540 1629
1541/** 1630/**
1542 * Iterator over #valid_peers hash map entries. 1631 * Iterator over #valid_peers hash map entries.
1543 * 1632 *
1544 * @param cls closure - unused 1633 * @param cls Closure that contains iterator function and closure
1545 * @param peer current peer id 1634 * @param peer current peer id
1546 * @param value value in the hash map - unused 1635 * @param value value in the hash map - unused
1547 * @return #GNUNET_YES if we should continue to 1636 * @return #GNUNET_YES if we should continue to
@@ -1556,21 +1645,22 @@ valid_peer_iterator (void *cls,
1556 struct PeersIteratorCls *it_cls = cls; 1645 struct PeersIteratorCls *it_cls = cls;
1557 (void) value; 1646 (void) value;
1558 1647
1559 return it_cls->iterator (it_cls->cls, 1648 return it_cls->iterator (it_cls->cls, peer);
1560 peer);
1561} 1649}
1562 1650
1563 1651
1564/** 1652/**
1565 * @brief Get all currently known, valid peer ids. 1653 * @brief Get all currently known, valid peer ids.
1566 * 1654 *
1567 * @param it function to call on each peer id 1655 * @param valid_peers Peer map containing the valid peers in question
1568 * @param it_cls extra argument to @a it 1656 * @param iterator function to call on each peer id
1657 * @param it_cls extra argument to @a iterator
1569 * @return the number of key value pairs processed, 1658 * @return the number of key value pairs processed,
1570 * #GNUNET_SYSERR if it aborted iteration 1659 * #GNUNET_SYSERR if it aborted iteration
1571 */ 1660 */
1572static int 1661static int
1573get_valid_peers (PeersIterator iterator, 1662get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1663 PeersIterator iterator,
1574 void *it_cls) 1664 void *it_cls)
1575{ 1665{
1576 struct PeersIteratorCls *cls; 1666 struct PeersIteratorCls *cls;
@@ -1579,7 +1669,7 @@ get_valid_peers (PeersIterator iterator,
1579 cls = GNUNET_new (struct PeersIteratorCls); 1669 cls = GNUNET_new (struct PeersIteratorCls);
1580 cls->iterator = iterator; 1670 cls->iterator = iterator;
1581 cls->cls = it_cls; 1671 cls->cls = it_cls;
1582 ret = GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 1672 ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1583 valid_peer_iterator, 1673 valid_peer_iterator,
1584 cls); 1674 cls);
1585 GNUNET_free (cls); 1675 GNUNET_free (cls);
@@ -1593,19 +1683,21 @@ get_valid_peers (PeersIterator iterator,
1593 * This function is called on new peer_ids from 'external' sources 1683 * This function is called on new peer_ids from 'external' sources
1594 * (client seed, cadet get_peers(), ...) 1684 * (client seed, cadet get_peers(), ...)
1595 * 1685 *
1686 * @param sub Sub with the peer map that the @a peer will be added to
1596 * @param peer the new #GNUNET_PeerIdentity 1687 * @param peer the new #GNUNET_PeerIdentity
1597 * 1688 *
1598 * @return #GNUNET_YES if peer was inserted 1689 * @return #GNUNET_YES if peer was inserted
1599 * #GNUNET_NO otherwise 1690 * #GNUNET_NO otherwise
1600 */ 1691 */
1601static int 1692static int
1602insert_peer (const struct GNUNET_PeerIdentity *peer) 1693insert_peer (struct Sub *sub,
1694 const struct GNUNET_PeerIdentity *peer)
1603{ 1695{
1604 if (GNUNET_YES == check_peer_known (peer)) 1696 if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1605 { 1697 {
1606 return GNUNET_NO; /* We already know this peer - nothing to do */ 1698 return GNUNET_NO; /* We already know this peer - nothing to do */
1607 } 1699 }
1608 (void) create_peer_ctx (peer); 1700 (void) create_peer_ctx (sub, peer);
1609 return GNUNET_YES; 1701 return GNUNET_YES;
1610} 1702}
1611 1703
@@ -1613,6 +1705,7 @@ insert_peer (const struct GNUNET_PeerIdentity *peer)
1613/** 1705/**
1614 * @brief Check whether flags on a peer are set. 1706 * @brief Check whether flags on a peer are set.
1615 * 1707 *
1708 * @param peer_map Peer map that is expected to contain the @a peer
1616 * @param peer the peer to check the flag of 1709 * @param peer the peer to check the flag of
1617 * @param flags the flags to check 1710 * @param flags the flags to check
1618 * 1711 *
@@ -1621,16 +1714,17 @@ insert_peer (const struct GNUNET_PeerIdentity *peer)
1621 * #GNUNET_NO otherwise 1714 * #GNUNET_NO otherwise
1622 */ 1715 */
1623static int 1716static int
1624check_peer_flag (const struct GNUNET_PeerIdentity *peer, 1717check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1718 const struct GNUNET_PeerIdentity *peer,
1625 enum Peers_PeerFlags flags) 1719 enum Peers_PeerFlags flags)
1626{ 1720{
1627 struct PeerContext *peer_ctx; 1721 struct PeerContext *peer_ctx;
1628 1722
1629 if (GNUNET_NO == check_peer_known (peer)) 1723 if (GNUNET_NO == check_peer_known (peer_map, peer))
1630 { 1724 {
1631 return GNUNET_SYSERR; 1725 return GNUNET_SYSERR;
1632 } 1726 }
1633 peer_ctx = get_peer_ctx (peer); 1727 peer_ctx = get_peer_ctx (peer_map, peer);
1634 return check_peer_flag_set (peer_ctx, flags); 1728 return check_peer_flag_set (peer_ctx, flags);
1635} 1729}
1636 1730
@@ -1639,18 +1733,20 @@ check_peer_flag (const struct GNUNET_PeerIdentity *peer,
1639 * 1733 *
1640 * If not known yet, insert into known peers 1734 * If not known yet, insert into known peers
1641 * 1735 *
1736 * @param sub Sub which would contain the @a peer
1642 * @param peer the peer whose online is to be checked 1737 * @param peer the peer whose online is to be checked
1643 * @return #GNUNET_YES if the check was issued 1738 * @return #GNUNET_YES if the check was issued
1644 * #GNUNET_NO otherwise 1739 * #GNUNET_NO otherwise
1645 */ 1740 */
1646static int 1741static int
1647issue_peer_online_check (const struct GNUNET_PeerIdentity *peer) 1742issue_peer_online_check (struct Sub *sub,
1743 const struct GNUNET_PeerIdentity *peer)
1648{ 1744{
1649 struct PeerContext *peer_ctx; 1745 struct PeerContext *peer_ctx;
1650 1746
1651 (void) insert_peer (peer); 1747 (void) insert_peer (sub, peer); // TODO even needed?
1652 peer_ctx = get_peer_ctx (peer); 1748 peer_ctx = get_peer_ctx (sub->peer_map, peer);
1653 if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) && 1749 if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1654 (NULL == peer_ctx->online_check_pending) ) 1750 (NULL == peer_ctx->online_check_pending) )
1655 { 1751 {
1656 check_peer_online (peer_ctx); 1752 check_peer_online (peer_ctx);
@@ -1668,22 +1764,20 @@ issue_peer_online_check (const struct GNUNET_PeerIdentity *peer)
1668 * - there are pending messages 1764 * - there are pending messages
1669 * - there is no pending pull reply 1765 * - there is no pending pull reply
1670 * 1766 *
1671 * @param peer the peer in question 1767 * @param peer_ctx Context of the peer in question
1672 * @return #GNUNET_YES if peer is removable 1768 * @return #GNUNET_YES if peer is removable
1673 * #GNUNET_NO if peer is NOT removable 1769 * #GNUNET_NO if peer is NOT removable
1674 * #GNUNET_SYSERR if peer is not known 1770 * #GNUNET_SYSERR if peer is not known
1675 */ 1771 */
1676static int 1772static int
1677check_removable (const struct GNUNET_PeerIdentity *peer) 1773check_removable (const struct PeerContext *peer_ctx)
1678{ 1774{
1679 struct PeerContext *peer_ctx; 1775 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1680 1776 &peer_ctx->peer_id))
1681 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer))
1682 { 1777 {
1683 return GNUNET_SYSERR; 1778 return GNUNET_SYSERR;
1684 } 1779 }
1685 1780
1686 peer_ctx = get_peer_ctx (peer);
1687 if ( (NULL != peer_ctx->recv_channel_ctx) || 1781 if ( (NULL != peer_ctx->recv_channel_ctx) ||
1688 (NULL != peer_ctx->pending_messages_head) || 1782 (NULL != peer_ctx->pending_messages_head) ||
1689 (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) ) 1783 (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
@@ -1699,15 +1793,17 @@ check_removable (const struct GNUNET_PeerIdentity *peer)
1699 * 1793 *
1700 * A valid peer is a peer that we know exists eg. we were connected to once. 1794 * A valid peer is a peer that we know exists eg. we were connected to once.
1701 * 1795 *
1796 * @param valid_peers Peer map that would contain the @a peer
1702 * @param peer peer in question 1797 * @param peer peer in question
1703 * 1798 *
1704 * @return #GNUNET_YES if peer is valid 1799 * @return #GNUNET_YES if peer is valid
1705 * #GNUNET_NO if peer is not valid 1800 * #GNUNET_NO if peer is not valid
1706 */ 1801 */
1707static int 1802static int
1708check_peer_valid (const struct GNUNET_PeerIdentity *peer) 1803check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1804 const struct GNUNET_PeerIdentity *peer)
1709{ 1805{
1710 return GNUNET_CONTAINER_multipeermap_contains (mss->valid_peers, peer); 1806 return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1711} 1807}
1712 1808
1713 1809
@@ -1716,13 +1812,14 @@ check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1716 * 1812 *
1717 * This establishes a sending channel 1813 * This establishes a sending channel
1718 * 1814 *
1719 * @param peer the peer to establish channel to 1815 * @param peer_ctx Context of the target peer
1720 */ 1816 */
1721static void 1817static void
1722indicate_sending_intention (const struct GNUNET_PeerIdentity *peer) 1818indicate_sending_intention (struct PeerContext *peer_ctx)
1723{ 1819{
1724 GNUNET_assert (GNUNET_YES == check_peer_known (peer)); 1820 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1725 (void) get_channel (peer); 1821 &peer_ctx->peer_id));
1822 (void) get_channel (peer_ctx);
1726} 1823}
1727 1824
1728 1825
@@ -1730,17 +1827,14 @@ indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1730 * @brief Check whether other peer has the intention to send/opened channel 1827 * @brief Check whether other peer has the intention to send/opened channel
1731 * towars us 1828 * towars us
1732 * 1829 *
1733 * @param peer the peer in question 1830 * @param peer_ctx Context of the peer in question
1734 * 1831 *
1735 * @return #GNUNET_YES if peer has the intention to send 1832 * @return #GNUNET_YES if peer has the intention to send
1736 * #GNUNET_NO otherwise 1833 * #GNUNET_NO otherwise
1737 */ 1834 */
1738static int 1835static int
1739check_peer_send_intention (const struct GNUNET_PeerIdentity *peer) 1836check_peer_send_intention (const struct PeerContext *peer_ctx)
1740{ 1837{
1741 const struct PeerContext *peer_ctx;
1742
1743 peer_ctx = get_peer_ctx (peer);
1744 if (NULL != peer_ctx->recv_channel_ctx) 1838 if (NULL != peer_ctx->recv_channel_ctx)
1745 { 1839 {
1746 return GNUNET_YES; 1840 return GNUNET_YES;
@@ -1752,7 +1846,7 @@ check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1752/** 1846/**
1753 * Handle the channel a peer opens to us. 1847 * Handle the channel a peer opens to us.
1754 * 1848 *
1755 * @param cls The closure 1849 * @param cls The closure - Sub
1756 * @param channel The channel the peer wants to establish 1850 * @param channel The channel the peer wants to establish
1757 * @param initiator The peer's peer ID 1851 * @param initiator The peer's peer ID
1758 * 1852 *
@@ -1765,23 +1859,22 @@ handle_inbound_channel (void *cls,
1765 const struct GNUNET_PeerIdentity *initiator) 1859 const struct GNUNET_PeerIdentity *initiator)
1766{ 1860{
1767 struct PeerContext *peer_ctx; 1861 struct PeerContext *peer_ctx;
1768 struct GNUNET_PeerIdentity *ctx_peer;
1769 struct ChannelCtx *channel_ctx; 1862 struct ChannelCtx *channel_ctx;
1770 (void) cls; 1863 struct Sub *sub = cls;
1771 1864
1772 LOG (GNUNET_ERROR_TYPE_DEBUG, 1865 LOG (GNUNET_ERROR_TYPE_DEBUG,
1773 "New channel was established to us (Peer %s).\n", 1866 "New channel was established to us (Peer %s).\n",
1774 GNUNET_i2s (initiator)); 1867 GNUNET_i2s (initiator));
1775 GNUNET_assert (NULL != channel); /* according to cadet API */ 1868 GNUNET_assert (NULL != channel); /* according to cadet API */
1776 /* Make sure we 'know' about this peer */ 1869 /* Make sure we 'know' about this peer */
1777 peer_ctx = create_or_get_peer_ctx (initiator); 1870 peer_ctx = create_or_get_peer_ctx (sub, initiator);
1778 set_peer_online (peer_ctx); 1871 set_peer_online (peer_ctx);
1779 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity); 1872 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1780 *ctx_peer = *initiator;
1781 channel_ctx = add_channel_ctx (peer_ctx); 1873 channel_ctx = add_channel_ctx (peer_ctx);
1782 channel_ctx->channel = channel; 1874 channel_ctx->channel = channel;
1783 /* We only accept one incoming channel per peer */ 1875 /* We only accept one incoming channel per peer */
1784 if (GNUNET_YES == check_peer_send_intention (initiator)) 1876 if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1877 initiator)))
1785 { 1878 {
1786 LOG (GNUNET_ERROR_TYPE_WARNING, 1879 LOG (GNUNET_ERROR_TYPE_WARNING,
1787 "Already got one receive channel. Destroying old one.\n"); 1880 "Already got one receive channel. Destroying old one.\n");
@@ -1799,21 +1892,19 @@ handle_inbound_channel (void *cls,
1799/** 1892/**
1800 * @brief Check whether a sending channel towards the given peer exists 1893 * @brief Check whether a sending channel towards the given peer exists
1801 * 1894 *
1802 * @param peer the peer to check for 1895 * @param peer_ctx Context of the peer in question
1803 * 1896 *
1804 * @return #GNUNET_YES if a sending channel towards that peer exists 1897 * @return #GNUNET_YES if a sending channel towards that peer exists
1805 * #GNUNET_NO otherwise 1898 * #GNUNET_NO otherwise
1806 */ 1899 */
1807static int 1900static int
1808check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer) 1901check_sending_channel_exists (const struct PeerContext *peer_ctx)
1809{ 1902{
1810 struct PeerContext *peer_ctx; 1903 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1811 1904 &peer_ctx->peer_id))
1812 if (GNUNET_NO == check_peer_known (peer))
1813 { /* If no such peer exists, there is no channel */ 1905 { /* If no such peer exists, there is no channel */
1814 return GNUNET_NO; 1906 return GNUNET_NO;
1815 } 1907 }
1816 peer_ctx = get_peer_ctx (peer);
1817 if (NULL == peer_ctx->send_channel_ctx) 1908 if (NULL == peer_ctx->send_channel_ctx)
1818 { 1909 {
1819 return GNUNET_NO; 1910 return GNUNET_NO;
@@ -1826,24 +1917,22 @@ check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1826 * @brief Destroy the send channel of a peer e.g. stop indicating a sending 1917 * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1827 * intention to another peer 1918 * intention to another peer
1828 * 1919 *
1829 * @peer the peer identity of the peer whose sending channel to destroy 1920 * @param peer_ctx Context to the peer
1830 * @return #GNUNET_YES if channel was destroyed 1921 * @return #GNUNET_YES if channel was destroyed
1831 * #GNUNET_NO otherwise 1922 * #GNUNET_NO otherwise
1832 */ 1923 */
1833static int 1924static int
1834destroy_sending_channel (const struct GNUNET_PeerIdentity *peer) 1925destroy_sending_channel (struct PeerContext *peer_ctx)
1835{ 1926{
1836 struct PeerContext *peer_ctx; 1927 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1837 1928 &peer_ctx->peer_id))
1838 if (GNUNET_NO == check_peer_known (peer))
1839 { 1929 {
1840 return GNUNET_NO; 1930 return GNUNET_NO;
1841 } 1931 }
1842 peer_ctx = get_peer_ctx (peer);
1843 if (NULL != peer_ctx->send_channel_ctx) 1932 if (NULL != peer_ctx->send_channel_ctx)
1844 { 1933 {
1845 destroy_channel (peer_ctx->send_channel_ctx); 1934 destroy_channel (peer_ctx->send_channel_ctx);
1846 (void) check_connected (peer); 1935 (void) check_connected (peer_ctx);
1847 return GNUNET_YES; 1936 return GNUNET_YES;
1848 } 1937 }
1849 return GNUNET_NO; 1938 return GNUNET_NO;
@@ -1855,12 +1944,12 @@ destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1855 * Keeps track about pending messages so they can be properly removed when the 1944 * Keeps track about pending messages so they can be properly removed when the
1856 * peer is destroyed. 1945 * peer is destroyed.
1857 * 1946 *
1858 * @param peer receeiver of the message 1947 * @param peer_ctx Context of the peer to which the message is to be sent
1859 * @param ev envelope of the message 1948 * @param ev envelope of the message
1860 * @param type type of the message 1949 * @param type type of the message
1861 */ 1950 */
1862static void 1951static void
1863send_message (const struct GNUNET_PeerIdentity *peer, 1952send_message (struct PeerContext *peer_ctx,
1864 struct GNUNET_MQ_Envelope *ev, 1953 struct GNUNET_MQ_Envelope *ev,
1865 const char *type) 1954 const char *type)
1866{ 1955{
@@ -1869,10 +1958,10 @@ send_message (const struct GNUNET_PeerIdentity *peer,
1869 1958
1870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1959 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1871 "Sending message to %s of type %s\n", 1960 "Sending message to %s of type %s\n",
1872 GNUNET_i2s (peer), 1961 GNUNET_i2s (&peer_ctx->peer_id),
1873 type); 1962 type);
1874 pending_msg = insert_pending_message (peer, ev, type); 1963 pending_msg = insert_pending_message (peer_ctx, ev, type);
1875 mq = get_mq (peer); 1964 mq = get_mq (peer_ctx);
1876 GNUNET_MQ_notify_sent (ev, 1965 GNUNET_MQ_notify_sent (ev,
1877 mq_notify_sent_cb, 1966 mq_notify_sent_cb,
1878 pending_msg); 1967 pending_msg);
@@ -1884,28 +1973,29 @@ send_message (const struct GNUNET_PeerIdentity *peer,
1884 * 1973 *
1885 * Avoids scheduling an operation twice. 1974 * Avoids scheduling an operation twice.
1886 * 1975 *
1887 * @param peer the peer we want to schedule the operation for once it gets 1976 * @param peer_ctx Context of the peer for which to schedule the operation
1888 * online 1977 * @param peer_op the operation to schedule
1978 * @param cls Closure to @a peer_op
1889 * 1979 *
1890 * @return #GNUNET_YES if the operation was scheduled 1980 * @return #GNUNET_YES if the operation was scheduled
1891 * #GNUNET_NO otherwise 1981 * #GNUNET_NO otherwise
1892 */ 1982 */
1893static int 1983static int
1894schedule_operation (const struct GNUNET_PeerIdentity *peer, 1984schedule_operation (struct PeerContext *peer_ctx,
1895 const PeerOp peer_op) 1985 const PeerOp peer_op,
1986 void *cls)
1896{ 1987{
1897 struct PeerPendingOp pending_op; 1988 struct PeerPendingOp pending_op;
1898 struct PeerContext *peer_ctx;
1899 1989
1900 GNUNET_assert (GNUNET_YES == check_peer_known (peer)); 1990 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1991 &peer_ctx->peer_id));
1901 1992
1902 //TODO if ONLINE execute immediately 1993 //TODO if ONLINE execute immediately
1903 1994
1904 if (GNUNET_NO == check_operation_scheduled (peer, peer_op)) 1995 if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
1905 { 1996 {
1906 peer_ctx = get_peer_ctx (peer);
1907 pending_op.op = peer_op; 1997 pending_op.op = peer_op;
1908 pending_op.op_cls = NULL; 1998 pending_op.op_cls = cls;
1909 GNUNET_array_append (peer_ctx->pending_ops, 1999 GNUNET_array_append (peer_ctx->pending_ops,
1910 peer_ctx->num_pending_ops, 2000 peer_ctx->num_pending_ops,
1911 pending_op); 2001 pending_op);
@@ -1983,6 +2073,11 @@ struct ClientContext
1983 * The client handle to send the reply to 2073 * The client handle to send the reply to
1984 */ 2074 */
1985 struct GNUNET_SERVICE_Client *client; 2075 struct GNUNET_SERVICE_Client *client;
2076
2077 /**
2078 * The #Sub this context belongs to
2079 */
2080 struct Sub *sub;
1986}; 2081};
1987 2082
1988/** 2083/**
@@ -2072,34 +2167,46 @@ rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2072 */ 2167 */
2073static void 2168static void
2074insert_in_view_op (void *cls, 2169insert_in_view_op (void *cls,
2075 const struct GNUNET_PeerIdentity *peer); 2170 const struct GNUNET_PeerIdentity *peer);
2076 2171
2077/** 2172/**
2078 * Insert PeerID in #view 2173 * Insert PeerID in #view
2079 * 2174 *
2080 * Called once we know a peer is online. 2175 * Called once we know a peer is online.
2081 * 2176 *
2177 * @param sub Sub in with the view to insert in
2178 * @param peer the peer to insert
2179 *
2082 * @return GNUNET_OK if peer was actually inserted 2180 * @return GNUNET_OK if peer was actually inserted
2083 * GNUNET_NO if peer was not inserted 2181 * GNUNET_NO if peer was not inserted
2084 */ 2182 */
2085static int 2183static int
2086insert_in_view (const struct GNUNET_PeerIdentity *peer) 2184insert_in_view (struct Sub *sub,
2185 const struct GNUNET_PeerIdentity *peer)
2087{ 2186{
2187 struct PeerContext *peer_ctx;
2088 int online; 2188 int online;
2089 int ret; 2189 int ret;
2090 2190
2091 online = check_peer_flag (peer, Peers_ONLINE); 2191 online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2192 peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2092 if ( (GNUNET_NO == online) || 2193 if ( (GNUNET_NO == online) ||
2093 (GNUNET_SYSERR == online) ) /* peer is not even known */ 2194 (GNUNET_SYSERR == online) ) /* peer is not even known */
2094 { 2195 {
2095 (void) issue_peer_online_check (peer); 2196 (void) issue_peer_online_check (sub, peer);
2096 (void) schedule_operation (peer, insert_in_view_op); 2197 (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2097 return GNUNET_NO; 2198 return GNUNET_NO;
2098 } 2199 }
2099 /* Open channel towards peer to keep connection open */ 2200 /* Open channel towards peer to keep connection open */
2100 indicate_sending_intention (peer); 2201 indicate_sending_intention (peer_ctx);
2101 ret = View_put (mss->view, peer); 2202 ret = View_put (sub->view, peer);
2102 GNUNET_STATISTICS_set (stats, "view size", View_size(mss->view), GNUNET_NO); 2203 if (peer_ctx->sub == msub)
2204 {
2205 GNUNET_STATISTICS_set (stats,
2206 "view size",
2207 View_size (peer_ctx->sub->view),
2208 GNUNET_NO);
2209 }
2103 return ret; 2210 return ret;
2104} 2211}
2105 2212
@@ -2111,18 +2218,21 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer)
2111 * @param view_array the peerids of the view as array (can be empty) 2218 * @param view_array the peerids of the view as array (can be empty)
2112 * @param view_size the size of the view array (can be 0) 2219 * @param view_size the size of the view array (can be 0)
2113 */ 2220 */
2114void 2221static void
2115send_view (const struct ClientContext *cli_ctx, 2222send_view (const struct ClientContext *cli_ctx,
2116 const struct GNUNET_PeerIdentity *view_array, 2223 const struct GNUNET_PeerIdentity *view_array,
2117 uint64_t view_size) 2224 uint64_t view_size)
2118{ 2225{
2119 struct GNUNET_MQ_Envelope *ev; 2226 struct GNUNET_MQ_Envelope *ev;
2120 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; 2227 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2228 struct Sub *sub;
2121 2229
2122 if (NULL == view_array) 2230 if (NULL == view_array)
2123 { 2231 {
2124 view_size = View_size (mss->view); 2232 if (NULL == cli_ctx->sub) sub = msub;
2125 view_array = View_get_as_array(mss->view); 2233 else sub = cli_ctx->sub;
2234 view_size = View_size (sub->view);
2235 view_array = View_get_as_array (sub->view);
2126 } 2236 }
2127 2237
2128 ev = GNUNET_MQ_msg_extra (out_msg, 2238 ev = GNUNET_MQ_msg_extra (out_msg,
@@ -2146,7 +2256,7 @@ send_view (const struct ClientContext *cli_ctx,
2146 * @param view_array the peerids of the view as array (can be empty) 2256 * @param view_array the peerids of the view as array (can be empty)
2147 * @param view_size the size of the view array (can be 0) 2257 * @param view_size the size of the view array (can be 0)
2148 */ 2258 */
2149void 2259static void
2150send_stream_peers (const struct ClientContext *cli_ctx, 2260send_stream_peers (const struct ClientContext *cli_ctx,
2151 uint64_t num_peers, 2261 uint64_t num_peers,
2152 const struct GNUNET_PeerIdentity *peers) 2262 const struct GNUNET_PeerIdentity *peers)
@@ -2170,16 +2280,18 @@ send_stream_peers (const struct ClientContext *cli_ctx,
2170 2280
2171/** 2281/**
2172 * @brief sends updates to clients that are interested 2282 * @brief sends updates to clients that are interested
2283 *
2284 * @param sub Sub for which to notify clients
2173 */ 2285 */
2174static void 2286static void
2175clients_notify_view_update (void) 2287clients_notify_view_update (const struct Sub *sub)
2176{ 2288{
2177 struct ClientContext *cli_ctx_iter; 2289 struct ClientContext *cli_ctx_iter;
2178 uint64_t num_peers; 2290 uint64_t num_peers;
2179 const struct GNUNET_PeerIdentity *view_array; 2291 const struct GNUNET_PeerIdentity *view_array;
2180 2292
2181 num_peers = View_size (mss->view); 2293 num_peers = View_size (sub->view);
2182 view_array = View_get_as_array(mss->view); 2294 view_array = View_get_as_array(sub->view);
2183 /* check size of view is small enough */ 2295 /* check size of view is small enough */
2184 if (GNUNET_MAX_MESSAGE_SIZE < num_peers) 2296 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2185 { 2297 {
@@ -2214,9 +2326,13 @@ clients_notify_view_update (void)
2214 2326
2215/** 2327/**
2216 * @brief sends updates to clients that are interested 2328 * @brief sends updates to clients that are interested
2329 *
2330 * @param num_peers Number of peers to send
2331 * @param peers the array of peers to send
2217 */ 2332 */
2218static void 2333static void
2219clients_notify_stream_peer (uint64_t num_peers, 2334clients_notify_stream_peer (const struct Sub *sub,
2335 uint64_t num_peers,
2220 const struct GNUNET_PeerIdentity *peers) 2336 const struct GNUNET_PeerIdentity *peers)
2221 // TODO enum StreamPeerSource) 2337 // TODO enum StreamPeerSource)
2222{ 2338{
@@ -2230,15 +2346,21 @@ clients_notify_stream_peer (uint64_t num_peers,
2230 NULL != cli_ctx_iter; 2346 NULL != cli_ctx_iter;
2231 cli_ctx_iter = cli_ctx_iter->next) 2347 cli_ctx_iter = cli_ctx_iter->next)
2232 { 2348 {
2233 if (GNUNET_YES == cli_ctx_iter->stream_update) 2349 if (GNUNET_YES == cli_ctx_iter->stream_update &&
2350 (sub == cli_ctx_iter->sub || sub == msub))
2234 { 2351 {
2235 send_stream_peers (cli_ctx_iter, num_peers, peers); 2352 send_stream_peers (cli_ctx_iter, num_peers, peers);
2236 } 2353 }
2237 } 2354 }
2238} 2355}
2239 2356
2357
2240/** 2358/**
2241 * Put random peer from sampler into the view as history update. 2359 * Put random peer from sampler into the view as history update.
2360 *
2361 * @param ids Array of Peers to insert into view
2362 * @param num_peers Number of peers to insert
2363 * @param cls Closure - The Sub for which this is to be done
2242 */ 2364 */
2243static void 2365static void
2244hist_update (const struct GNUNET_PeerIdentity *ids, 2366hist_update (const struct GNUNET_PeerIdentity *ids,
@@ -2246,21 +2368,21 @@ hist_update (const struct GNUNET_PeerIdentity *ids,
2246 void *cls) 2368 void *cls)
2247{ 2369{
2248 unsigned int i; 2370 unsigned int i;
2249 (void) cls; 2371 struct Sub *sub = cls;
2250 2372
2251 for (i = 0; i < num_peers; i++) 2373 for (i = 0; i < num_peers; i++)
2252 { 2374 {
2253 int inserted; 2375 int inserted;
2254 inserted = insert_in_view (&ids[i]); 2376 inserted = insert_in_view (sub, &ids[i]);
2255 if (GNUNET_OK == inserted) 2377 if (GNUNET_OK == inserted)
2256 { 2378 {
2257 clients_notify_stream_peer (1, &ids[i]); 2379 clients_notify_stream_peer (sub, 1, &ids[i]);
2258 } 2380 }
2259 to_file (mss->file_name_view_log, 2381 to_file (sub->file_name_view_log,
2260 "+%s\t(hist)", 2382 "+%s\t(hist)",
2261 GNUNET_i2s_full (ids)); 2383 GNUNET_i2s_full (ids));
2262 } 2384 }
2263 clients_notify_view_update(); 2385 clients_notify_view_update (sub);
2264} 2386}
2265 2387
2266 2388
@@ -2269,6 +2391,9 @@ hist_update (const struct GNUNET_PeerIdentity *ids,
2269 * 2391 *
2270 * If we do not have enough sampler elements, double current sampler size 2392 * If we do not have enough sampler elements, double current sampler size
2271 * If we have more than enough sampler elements, halv current sampler size 2393 * If we have more than enough sampler elements, halv current sampler size
2394 *
2395 * @param sampler The sampler to resize
2396 * @param new_size New size to which to resize
2272 */ 2397 */
2273static void 2398static void
2274resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size) 2399resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
@@ -2316,10 +2441,13 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2316 &peer_array[i], 2441 &peer_array[i],
2317 NULL, 2442 NULL,
2318 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 2443 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2319 GNUNET_STATISTICS_set (stats, 2444 if (msub->peer_map == peer_map)
2320 "# known peers", 2445 {
2321 GNUNET_CONTAINER_multipeermap_size (peer_map), 2446 GNUNET_STATISTICS_set (stats,
2322 GNUNET_NO); 2447 "# known peers",
2448 GNUNET_CONTAINER_multipeermap_size (peer_map),
2449 GNUNET_NO);
2450 }
2323 } 2451 }
2324} 2452}
2325 2453
@@ -2327,12 +2455,12 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2327/** 2455/**
2328 * Send a PULL REPLY to @a peer_id 2456 * Send a PULL REPLY to @a peer_id
2329 * 2457 *
2330 * @param peer_id the peer to send the reply to. 2458 * @param peer_ctx Context of the peer to send the reply to
2331 * @param peer_ids the peers to send to @a peer_id 2459 * @param peer_ids the peers to send to @a peer_id
2332 * @param num_peer_ids the number of peers to send to @a peer_id 2460 * @param num_peer_ids the number of peers to send to @a peer_id
2333 */ 2461 */
2334static void 2462static void
2335send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, 2463send_pull_reply (struct PeerContext *peer_ctx,
2336 const struct GNUNET_PeerIdentity *peer_ids, 2464 const struct GNUNET_PeerIdentity *peer_ids,
2337 unsigned int num_peer_ids) 2465 unsigned int num_peer_ids)
2338{ 2466{
@@ -2358,7 +2486,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2358 2486
2359 LOG (GNUNET_ERROR_TYPE_DEBUG, 2487 LOG (GNUNET_ERROR_TYPE_DEBUG,
2360 "Going to send PULL REPLY with %u peers to %s\n", 2488 "Going to send PULL REPLY with %u peers to %s\n",
2361 send_size, GNUNET_i2s (peer_id)); 2489 send_size, GNUNET_i2s (&peer_ctx->peer_id));
2362 2490
2363 ev = GNUNET_MQ_msg_extra (out_msg, 2491 ev = GNUNET_MQ_msg_extra (out_msg,
2364 send_size * sizeof (struct GNUNET_PeerIdentity), 2492 send_size * sizeof (struct GNUNET_PeerIdentity),
@@ -2367,8 +2495,11 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2367 GNUNET_memcpy (&out_msg[1], peer_ids, 2495 GNUNET_memcpy (&out_msg[1], peer_ids,
2368 send_size * sizeof (struct GNUNET_PeerIdentity)); 2496 send_size * sizeof (struct GNUNET_PeerIdentity));
2369 2497
2370 send_message (peer_id, ev, "PULL REPLY"); 2498 send_message (peer_ctx, ev, "PULL REPLY");
2371 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO); 2499 if (peer_ctx->sub == msub)
2500 {
2501 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2502 }
2372 // TODO check with send intention: as send_channel is used/opened we indicate 2503 // TODO check with send intention: as send_channel is used/opened we indicate
2373 // a sending intention without intending it. 2504 // a sending intention without intending it.
2374 // -> clean peer afterwards? 2505 // -> clean peer afterwards?
@@ -2380,13 +2511,17 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2380 * Insert PeerID in #pull_map 2511 * Insert PeerID in #pull_map
2381 * 2512 *
2382 * Called once we know a peer is online. 2513 * Called once we know a peer is online.
2514 *
2515 * @param cls Closure - Sub with the pull map to insert into
2516 * @param peer Peer to insert
2383 */ 2517 */
2384static void 2518static void
2385insert_in_pull_map (void *cls, 2519insert_in_pull_map (void *cls,
2386 const struct GNUNET_PeerIdentity *peer) 2520 const struct GNUNET_PeerIdentity *peer)
2387{ 2521{
2388 (void) cls; 2522 struct Sub *sub = cls;
2389 CustomPeerMap_put (mss->pull_map, peer); 2523
2524 CustomPeerMap_put (sub->pull_map, peer);
2390} 2525}
2391 2526
2392 2527
@@ -2395,18 +2530,21 @@ insert_in_pull_map (void *cls,
2395 * 2530 *
2396 * Called once we know a peer is online. 2531 * Called once we know a peer is online.
2397 * Implements #PeerOp 2532 * Implements #PeerOp
2533 *
2534 * @param cls Closure - Sub with view to insert peer into
2535 * @param peer the peer to insert
2398 */ 2536 */
2399static void 2537static void
2400insert_in_view_op (void *cls, 2538insert_in_view_op (void *cls,
2401 const struct GNUNET_PeerIdentity *peer) 2539 const struct GNUNET_PeerIdentity *peer)
2402{ 2540{
2403 (void) cls; 2541 struct Sub *sub = cls;
2404 int inserted; 2542 int inserted;
2405 2543
2406 inserted = insert_in_view (peer); 2544 inserted = insert_in_view (sub, peer);
2407 if (GNUNET_OK == inserted) 2545 if (GNUNET_OK == inserted)
2408 { 2546 {
2409 clients_notify_stream_peer (1, peer); 2547 clients_notify_stream_peer (sub, 1, peer);
2410 } 2548 }
2411} 2549}
2412 2550
@@ -2414,41 +2552,46 @@ insert_in_view_op (void *cls,
2414/** 2552/**
2415 * Update sampler with given PeerID. 2553 * Update sampler with given PeerID.
2416 * Implements #PeerOp 2554 * Implements #PeerOp
2555 *
2556 * @param cls Closure - Sub containing the sampler to insert into
2557 * @param peer Peer to insert
2417 */ 2558 */
2418static void 2559static void
2419insert_in_sampler (void *cls, 2560insert_in_sampler (void *cls,
2420 const struct GNUNET_PeerIdentity *peer) 2561 const struct GNUNET_PeerIdentity *peer)
2421{ 2562{
2422 (void) cls; 2563 struct Sub *sub = cls;
2564
2423 LOG (GNUNET_ERROR_TYPE_DEBUG, 2565 LOG (GNUNET_ERROR_TYPE_DEBUG,
2424 "Updating samplers with peer %s from insert_in_sampler()\n", 2566 "Updating samplers with peer %s from insert_in_sampler()\n",
2425 GNUNET_i2s (peer)); 2567 GNUNET_i2s (peer));
2426 RPS_sampler_update (mss->sampler, peer); 2568 RPS_sampler_update (sub->sampler, peer);
2427 if (0 < RPS_sampler_count_id (mss->sampler, peer)) 2569 if (0 < RPS_sampler_count_id (sub->sampler, peer))
2428 { 2570 {
2429 /* Make sure we 'know' about this peer */ 2571 /* Make sure we 'know' about this peer */
2430 (void) issue_peer_online_check (peer); 2572 (void) issue_peer_online_check (sub, peer);
2431 /* Establish a channel towards that peer to indicate we are going to send 2573 /* Establish a channel towards that peer to indicate we are going to send
2432 * messages to it */ 2574 * messages to it */
2433 //indicate_sending_intention (peer); 2575 //indicate_sending_intention (peer);
2434 } 2576 }
2435 #ifdef TO_FILE 2577 #ifdef TO_FILE
2436 mss->num_observed_peers++; 2578 sub->num_observed_peers++;
2437 GNUNET_CONTAINER_multipeermap_put 2579 GNUNET_CONTAINER_multipeermap_put
2438 (mss->observed_unique_peers, 2580 (sub->observed_unique_peers,
2439 peer, 2581 peer,
2440 NULL, 2582 NULL,
2441 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 2583 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2442 uint32_t num_observed_unique_peers = 2584 uint32_t num_observed_unique_peers =
2443 GNUNET_CONTAINER_multipeermap_size (mss->observed_unique_peers); 2585 GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2444 to_file (mss->file_name_observed_log, 2586 to_file (sub->file_name_observed_log,
2445 "%" PRIu32 " %" PRIu32 " %f\n", 2587 "%" PRIu32 " %" PRIu32 " %f\n",
2446 mss->num_observed_peers, 2588 sub->num_observed_peers,
2447 num_observed_unique_peers, 2589 num_observed_unique_peers,
2448 1.0*num_observed_unique_peers/mss->num_observed_peers) 2590 1.0*num_observed_unique_peers/sub->num_observed_peers)
2449 #endif /* TO_FILE */ 2591 #endif /* TO_FILE */
2450} 2592}
2451 2593
2594
2452/** 2595/**
2453 * @brief This is called on peers from external sources (cadet, peerinfo, ...) 2596 * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2454 * If the peer is not known, online check is issued and it is 2597 * If the peer is not known, online check is issued and it is
@@ -2456,45 +2599,60 @@ insert_in_sampler (void *cls,
2456 * 2599 *
2457 * "External sources" refer to every source except the gossip. 2600 * "External sources" refer to every source except the gossip.
2458 * 2601 *
2459 * @param peer peer to insert 2602 * @param sub Sub for which @a peer was received
2603 * @param peer peer to insert/peer received
2460 */ 2604 */
2461static void 2605static void
2462got_peer (const struct GNUNET_PeerIdentity *peer) 2606got_peer (struct Sub *sub,
2607 const struct GNUNET_PeerIdentity *peer)
2463{ 2608{
2464 /* If we did not know this peer already, insert it into sampler and view */ 2609 /* If we did not know this peer already, insert it into sampler and view */
2465 if (GNUNET_YES == issue_peer_online_check (peer)) 2610 if (GNUNET_YES == issue_peer_online_check (sub, peer))
2466 { 2611 {
2467 schedule_operation (peer, insert_in_sampler); 2612 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2468 schedule_operation (peer, insert_in_view_op); 2613 &insert_in_sampler, sub);
2614 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2615 &insert_in_view_op, sub);
2616 }
2617 if (sub == msub)
2618 {
2619 GNUNET_STATISTICS_update (stats,
2620 "# learnd peers",
2621 1,
2622 GNUNET_NO);
2469 } 2623 }
2470 GNUNET_STATISTICS_update (stats,
2471 "# learnd peers",
2472 1,
2473 GNUNET_NO);
2474} 2624}
2475 2625
2626
2476/** 2627/**
2477 * @brief Checks if there is a sending channel and if it is needed 2628 * @brief Checks if there is a sending channel and if it is needed
2478 * 2629 *
2479 * @param peer the peer whose sending channel is checked 2630 * @param peer_ctx Context of the peer to check
2480 * @return GNUNET_YES if sending channel exists and is still needed 2631 * @return GNUNET_YES if sending channel exists and is still needed
2481 * GNUNET_NO otherwise 2632 * GNUNET_NO otherwise
2482 */ 2633 */
2483static int 2634static int
2484check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer) 2635check_sending_channel_needed (const struct PeerContext *peer_ctx)
2485{ 2636{
2486 /* struct GNUNET_CADET_Channel *channel; */ 2637 /* struct GNUNET_CADET_Channel *channel; */
2487 if (GNUNET_NO == check_peer_known (peer)) 2638 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2639 &peer_ctx->peer_id))
2488 { 2640 {
2489 return GNUNET_NO; 2641 return GNUNET_NO;
2490 } 2642 }
2491 if (GNUNET_YES == check_sending_channel_exists (peer)) 2643 if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2492 { 2644 {
2493 if ( (0 < RPS_sampler_count_id (mss->sampler, peer)) || 2645 if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2494 (GNUNET_YES == View_contains_peer (mss->view, peer)) || 2646 &peer_ctx->peer_id)) ||
2495 (GNUNET_YES == CustomPeerMap_contains_peer (mss->push_map, peer)) || 2647 (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2496 (GNUNET_YES == CustomPeerMap_contains_peer (mss->pull_map, peer)) || 2648 &peer_ctx->peer_id)) ||
2497 (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING))) 2649 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2650 &peer_ctx->peer_id)) ||
2651 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2652 &peer_ctx->peer_id)) ||
2653 (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2654 &peer_ctx->peer_id,
2655 Peers_PULL_REPLY_PENDING)))
2498 { /* If we want to keep the connection to peer open */ 2656 { /* If we want to keep the connection to peer open */
2499 return GNUNET_YES; 2657 return GNUNET_YES;
2500 } 2658 }
@@ -2503,20 +2661,23 @@ check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2503 return GNUNET_NO; 2661 return GNUNET_NO;
2504} 2662}
2505 2663
2664
2506/** 2665/**
2507 * @brief remove peer from our knowledge, the view, push and pull maps and 2666 * @brief remove peer from our knowledge, the view, push and pull maps and
2508 * samplers. 2667 * samplers.
2509 * 2668 *
2669 * @param sub Sub with the data structures the peer is to be removed from
2510 * @param peer the peer to remove 2670 * @param peer the peer to remove
2511 */ 2671 */
2512static void 2672static void
2513remove_peer (const struct GNUNET_PeerIdentity *peer) 2673remove_peer (struct Sub *sub,
2674 const struct GNUNET_PeerIdentity *peer)
2514{ 2675{
2515 (void) View_remove_peer (mss->view, peer); 2676 (void) View_remove_peer (sub->view, peer);
2516 CustomPeerMap_remove_peer (mss->pull_map, peer); 2677 CustomPeerMap_remove_peer (sub->pull_map, peer);
2517 CustomPeerMap_remove_peer (mss->push_map, peer); 2678 CustomPeerMap_remove_peer (sub->push_map, peer);
2518 RPS_sampler_reinitialise_by_value (mss->sampler, peer); 2679 RPS_sampler_reinitialise_by_value (sub->sampler, peer);
2519 destroy_peer (get_peer_ctx (peer)); 2680 destroy_peer (get_peer_ctx (sub->peer_map, peer));
2520} 2681}
2521 2682
2522 2683
@@ -2525,35 +2686,47 @@ remove_peer (const struct GNUNET_PeerIdentity *peer)
2525 * 2686 *
2526 * If the sending channel is no longer needed it is destroyed. 2687 * If the sending channel is no longer needed it is destroyed.
2527 * 2688 *
2689 * @param sub Sub in which the current peer is to be cleaned
2528 * @param peer the peer whose data is about to be cleaned 2690 * @param peer the peer whose data is about to be cleaned
2529 */ 2691 */
2530static void 2692static void
2531clean_peer (const struct GNUNET_PeerIdentity *peer) 2693clean_peer (struct Sub *sub,
2694 const struct GNUNET_PeerIdentity *peer)
2532{ 2695{
2533 if (GNUNET_NO == check_sending_channel_needed (peer)) 2696 if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2697 peer)))
2534 { 2698 {
2535 LOG (GNUNET_ERROR_TYPE_DEBUG, 2699 LOG (GNUNET_ERROR_TYPE_DEBUG,
2536 "Going to remove send channel to peer %s\n", 2700 "Going to remove send channel to peer %s\n",
2537 GNUNET_i2s (peer)); 2701 GNUNET_i2s (peer));
2538 #ifdef ENABLE_MALICIOUS 2702 #ifdef ENABLE_MALICIOUS
2539 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) 2703 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2540 (void) destroy_sending_channel (peer); 2704 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
2541 #else /* ENABLE_MALICIOUS */ 2705 #else /* ENABLE_MALICIOUS */
2542 (void) destroy_sending_channel (peer); 2706 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
2543 #endif /* ENABLE_MALICIOUS */ 2707 #endif /* ENABLE_MALICIOUS */
2544 } 2708 }
2545 2709
2546 if ( (GNUNET_NO == check_peer_send_intention (peer)) && 2710 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map, peer))
2547 (GNUNET_NO == View_contains_peer (mss->view, peer)) && 2711 {
2548 (GNUNET_NO == CustomPeerMap_contains_peer (mss->push_map, peer)) && 2712 /* Peer was already removed by callback on destroyed channel */
2549 (GNUNET_NO == CustomPeerMap_contains_peer (mss->push_map, peer)) && 2713 LOG (GNUNET_ERROR_TYPE_WARNING,
2550 (0 == RPS_sampler_count_id (mss->sampler, peer)) && 2714 "Peer was removed from our knowledge during cleanup\n");
2551 (GNUNET_NO != check_removable (peer)) ) 2715 return;
2716 }
2717
2718 if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2719 peer))) &&
2720 (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2721 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2722 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2723 (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2724 (GNUNET_NO != check_removable (get_peer_ctx (sub->peer_map, peer))) )
2552 { /* We can safely remove this peer */ 2725 { /* We can safely remove this peer */
2553 LOG (GNUNET_ERROR_TYPE_DEBUG, 2726 LOG (GNUNET_ERROR_TYPE_DEBUG,
2554 "Going to remove peer %s\n", 2727 "Going to remove peer %s\n",
2555 GNUNET_i2s (peer)); 2728 GNUNET_i2s (peer));
2556 remove_peer (peer); 2729 remove_peer (sub, peer);
2557 return; 2730 return;
2558 } 2731 }
2559} 2732}
@@ -2567,9 +2740,8 @@ clean_peer (const struct GNUNET_PeerIdentity *peer)
2567 * Also check if the knowledge about this peer is still needed. 2740 * Also check if the knowledge about this peer is still needed.
2568 * If not, remove this peer from our knowledge. 2741 * If not, remove this peer from our knowledge.
2569 * 2742 *
2570 * @param cls The closure 2743 * @param cls The closure - Context to the channel
2571 * @param channel The channel being closed 2744 * @param channel The channel being closed
2572 * @param channel_ctx The context associated with this channel
2573 */ 2745 */
2574static void 2746static void
2575cleanup_destroyed_channel (void *cls, 2747cleanup_destroyed_channel (void *cls,
@@ -2577,15 +2749,15 @@ cleanup_destroyed_channel (void *cls,
2577{ 2749{
2578 struct ChannelCtx *channel_ctx = cls; 2750 struct ChannelCtx *channel_ctx = cls;
2579 struct PeerContext *peer_ctx = channel_ctx->peer_ctx; 2751 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2580 (void) cls;
2581 (void) channel; 2752 (void) channel;
2582 2753
2583 channel_ctx->channel = NULL; 2754 channel_ctx->channel = NULL;
2584 remove_channel_ctx (channel_ctx); 2755 remove_channel_ctx (channel_ctx);
2585 if (NULL != peer_ctx && 2756 if (NULL != peer_ctx &&
2586 peer_ctx->send_channel_ctx == channel_ctx) 2757 peer_ctx->send_channel_ctx == channel_ctx &&
2758 GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2587 { 2759 {
2588 remove_peer (&peer_ctx->peer_id); 2760 remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2589 } 2761 }
2590} 2762}
2591 2763
@@ -2596,25 +2768,30 @@ cleanup_destroyed_channel (void *cls,
2596 2768
2597 2769
2598/*********************************************************************** 2770/***********************************************************************
2599 * SubSampler 2771 * Sub
2600***********************************************************************/ 2772***********************************************************************/
2601 2773
2602struct SubSampler * 2774/**
2603new_subsampler (const char *shared_value, 2775 * @brief Create a new Sub
2604 uint32_t sampler_size, 2776 *
2605 struct GNUNET_TIME_Relative round_interval) 2777 * @param hash Hash of value shared among rps instances on other hosts that
2778 * defines a subgroup to sample from.
2779 * @param sampler_size Size of the sampler
2780 * @param round_interval Interval (in average) between two rounds
2781 *
2782 * @return Sub
2783 */
2784struct Sub *
2785new_sub (const struct GNUNET_HashCode *hash,
2786 uint32_t sampler_size,
2787 struct GNUNET_TIME_Relative round_interval)
2606{ 2788{
2607 struct SubSampler *ss; 2789 struct Sub *sub;
2608 char hash_port_string[512] = GNUNET_APPLICATION_PORT_RPS;
2609 2790
2610 ss = GNUNET_new (struct SubSampler); 2791 sub = GNUNET_new (struct Sub);
2611 2792
2612 /* With the hash generated from the secret value this service only connects 2793 /* With the hash generated from the secret value this service only connects
2613 * to rps instances that share the value */ 2794 * to rps instances that share the value */
2614 strcat (hash_port_string, shared_value);
2615 GNUNET_CRYPTO_hash (hash_port_string,
2616 strlen (hash_port_string),
2617 &ss->port);
2618 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 2795 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2619 GNUNET_MQ_hd_fixed_size (peer_check, 2796 GNUNET_MQ_hd_fixed_size (peer_check,
2620 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE, 2797 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
@@ -2634,17 +2811,16 @@ new_subsampler (const char *shared_value,
2634 NULL), 2811 NULL),
2635 GNUNET_MQ_handler_end () 2812 GNUNET_MQ_handler_end ()
2636 }; 2813 };
2637 ss->cadet_handle = GNUNET_CADET_connect (cfg); 2814 sub->hash = *hash;
2638 GNUNET_assert (NULL != ss->cadet_handle); 2815 sub->cadet_port =
2639 ss->cadet_port = 2816 GNUNET_CADET_open_port (cadet_handle,
2640 GNUNET_CADET_open_port (ss->cadet_handle, 2817 &sub->hash,
2641 &ss->port,
2642 &handle_inbound_channel, /* Connect handler */ 2818 &handle_inbound_channel, /* Connect handler */
2643 NULL, /* cls */ 2819 sub, /* cls */
2644 NULL, /* WindowSize handler */ 2820 NULL, /* WindowSize handler */
2645 &cleanup_destroyed_channel, /* Disconnect handler */ 2821 &cleanup_destroyed_channel, /* Disconnect handler */
2646 cadet_handlers); 2822 cadet_handlers);
2647 if (NULL == ss->cadet_port) 2823 if (NULL == sub->cadet_port)
2648 { 2824 {
2649 LOG (GNUNET_ERROR_TYPE_ERROR, 2825 LOG (GNUNET_ERROR_TYPE_ERROR,
2650 "Cadet port `%s' is already in use.\n", 2826 "Cadet port `%s' is already in use.\n",
@@ -2653,56 +2829,246 @@ new_subsampler (const char *shared_value,
2653 } 2829 }
2654 2830
2655 /* Set up general data structure to keep track about peers */ 2831 /* Set up general data structure to keep track about peers */
2656 ss->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); 2832 sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2657 if (GNUNET_OK != 2833 if (GNUNET_OK !=
2658 GNUNET_CONFIGURATION_get_value_filename (cfg, 2834 GNUNET_CONFIGURATION_get_value_filename (cfg,
2659 "rps", 2835 "rps",
2660 "FILENAME_VALID_PEERS", 2836 "FILENAME_VALID_PEERS",
2661 &ss->filename_valid_peers)) 2837 &sub->filename_valid_peers))
2662 { 2838 {
2663 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 2839 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2664 "rps", 2840 "rps",
2665 "FILENAME_VALID_PEERS"); 2841 "FILENAME_VALID_PEERS");
2666 } 2842 }
2667 ss->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); 2843 if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2844 {
2845 char *tmp_filename_valid_peers;
2846 char str_hash[105];
2847 uint32_t len_filename_valid_peers;
2848
2849 (void) GNUNET_snprintf (str_hash, 105, GNUNET_h2s_full (hash));
2850 tmp_filename_valid_peers = GNUNET_strdup (sub->filename_valid_peers);
2851 GNUNET_free (sub->filename_valid_peers);
2852 len_filename_valid_peers = strlen (tmp_filename_valid_peers) + 105; /* Len of full hash + 1 */
2853 sub->filename_valid_peers = GNUNET_malloc (len_filename_valid_peers);
2854 strncat (sub->filename_valid_peers,
2855 tmp_filename_valid_peers,
2856 len_filename_valid_peers);
2857 strncat (sub->filename_valid_peers,
2858 str_hash,
2859 len_filename_valid_peers);
2860 GNUNET_free (tmp_filename_valid_peers);
2861 }
2862 sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2668 2863
2669 /* Set up the sampler */ 2864 /* Set up the sampler */
2670 ss->sampler_size_est_min = sampler_size; 2865 sub->sampler_size_est_min = sampler_size;
2671 ss->sampler_size_est_need = sampler_size;; 2866 sub->sampler_size_est_need = sampler_size;;
2672 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", ss->sampler_size_est_min); 2867 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2673 ss->round_interval = round_interval; 2868 GNUNET_assert (0 != round_interval.rel_value_us);
2674 ss->sampler = RPS_sampler_init (sampler_size, 2869 sub->round_interval = round_interval;
2870 sub->sampler = RPS_sampler_init (sampler_size,
2675 round_interval); 2871 round_interval);
2676 2872
2677 /* Logging of internals */ 2873 /* Logging of internals */
2678 ss->file_name_view_log = store_prefix_file_name (&own_identity, "view"); 2874 sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2679 #ifdef TO_FILE 2875 #ifdef TO_FILE
2680 ss->file_name_observed_log = store_prefix_file_name (&own_identity, 2876 sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2681 "observed"); 2877 "observed");
2682 ss->num_observed_peers = 0; 2878 sub->file_name_push_recv = store_prefix_file_name (&own_identity,
2683 ss->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, 2879 "push_recv");
2880 sub->file_name_pull_delays = store_prefix_file_name (&own_identity,
2881 "pull_delays");
2882 sub->num_observed_peers = 0;
2883 sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2684 GNUNET_NO); 2884 GNUNET_NO);
2685 #endif /* TO_FILE */ 2885 #endif /* TO_FILE */
2686 2886
2687 /* Set up data structures for gossip */ 2887 /* Set up data structures for gossip */
2688 ss->push_map = CustomPeerMap_create (4); 2888 sub->push_map = CustomPeerMap_create (4);
2689 ss->pull_map = CustomPeerMap_create (4); 2889 sub->pull_map = CustomPeerMap_create (4);
2690 ss->view_size_est_min = sampler_size;; 2890 sub->view_size_est_min = sampler_size;;
2691 ss->view = View_create (ss->view_size_est_min); 2891 sub->view = View_create (sub->view_size_est_min);
2692 GNUNET_STATISTICS_set (stats, 2892 if (sub == msub)
2693 "view size aim", 2893 {
2694 ss->view_size_est_min, 2894 GNUNET_STATISTICS_set (stats,
2695 GNUNET_NO); 2895 "view size aim",
2896 sub->view_size_est_min,
2897 GNUNET_NO);
2898 }
2696 2899
2900 /* Start executing rounds */
2901 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2697 2902
2698 return ss; 2903 return sub;
2699} 2904}
2700 2905
2906
2907/**
2908 * @brief Destroy Sub.
2909 *
2910 * @param sub Sub to destroy
2911 */
2912static void
2913destroy_sub (struct Sub *sub)
2914{
2915#ifdef TO_FILE
2916 char push_recv_str[1536] = ""; /* 256 * 6 (1 whitespace, 1 comma, up to 4 chars) */
2917 char pull_delays_str[1536] = ""; /* 256 * 6 (1 whitespace, 1 comma, up to 4 chars) */
2918#endif /* TO_FILE */
2919 GNUNET_assert (NULL != sub);
2920 GNUNET_assert (NULL != sub->do_round_task);
2921 GNUNET_SCHEDULER_cancel (sub->do_round_task);
2922 sub->do_round_task = NULL;
2923
2924 /* Disconnect from cadet */
2925 GNUNET_CADET_close_port (sub->cadet_port);
2926
2927 /* Clean up data structures for peers */
2928 RPS_sampler_destroy (sub->sampler);
2929 sub->sampler = NULL;
2930 View_destroy (sub->view);
2931 sub->view = NULL;
2932 CustomPeerMap_destroy (sub->push_map);
2933 sub->push_map = NULL;
2934 CustomPeerMap_destroy (sub->pull_map);
2935 sub->pull_map = NULL;
2936 peers_terminate (sub);
2937
2938 /* Free leftover data structures */
2939 GNUNET_free (sub->file_name_view_log);
2940 sub->file_name_view_log = NULL;
2941#ifdef TO_FILE
2942 GNUNET_free (sub->file_name_observed_log);
2943 sub->file_name_observed_log = NULL;
2944
2945 /* Write push frequencies to disk */
2946 for (uint32_t i = 0; i < 256; i++)
2947 {
2948 char push_recv_str_tmp[8];
2949 (void) snprintf (push_recv_str_tmp, 8, "%" PRIu32 "\n", sub->push_recv[i]);
2950 LOG (GNUNET_ERROR_TYPE_DEBUG,
2951 "Adding str `%s' to `%s'\n",
2952 push_recv_str_tmp,
2953 push_recv_str);
2954 (void) strncat (push_recv_str,
2955 push_recv_str_tmp,
2956 1535 - strnlen (push_recv_str, 1536));
2957 }
2958 (void) strncat (push_recv_str,
2959 "\n",
2960 1535 - strnlen (push_recv_str, 1536));
2961 LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing push stats to disk\n");
2962 to_file_w_len (sub->file_name_push_recv, 1535, push_recv_str);
2963 GNUNET_free (sub->file_name_push_recv);
2964 sub->file_name_push_recv = NULL;
2965
2966 /* Write pull delays to disk */
2967 for (uint32_t i = 0; i < 256; i++)
2968 {
2969 char pull_delays_str_tmp[8];
2970 (void) snprintf (pull_delays_str_tmp, 8, "%" PRIu32 "\n", sub->pull_delays[i]);
2971 LOG (GNUNET_ERROR_TYPE_DEBUG,
2972 "Adding str `%s' to `%s'\n",
2973 pull_delays_str_tmp,
2974 pull_delays_str);
2975 (void) strncat (pull_delays_str,
2976 pull_delays_str_tmp,
2977 1535 - strnlen (pull_delays_str, 1536));
2978 }
2979 (void) strncat (pull_delays_str,
2980 "\n",
2981 1535 - strnlen (pull_delays_str, 1536));
2982 LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing pull delays to disk\n");
2983 to_file_w_len (sub->file_name_pull_delays, 1535, pull_delays_str);
2984 GNUNET_free (sub->file_name_pull_delays);
2985 sub->file_name_pull_delays = NULL;
2986
2987 GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
2988 sub->observed_unique_peers = NULL;
2989#endif /* TO_FILE */
2990
2991 GNUNET_free (sub);
2992}
2993
2994
2995/***********************************************************************
2996 * /Sub
2997***********************************************************************/
2998
2999
2701/*********************************************************************** 3000/***********************************************************************
2702 * /SubSampler 3001 * Core handlers
2703***********************************************************************/ 3002***********************************************************************/
2704 3003
3004/**
3005 * @brief Callback on initialisation of Core.
3006 *
3007 * @param cls - unused
3008 * @param my_identity - unused
3009 */
3010void
3011core_init (void *cls,
3012 const struct GNUNET_PeerIdentity *my_identity)
3013{
3014 (void) cls;
3015 (void) my_identity;
3016
3017 map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3018}
3019
2705 3020
3021/**
3022 * @brief Callback for core.
3023 * Method called whenever a given peer connects.
3024 *
3025 * @param cls closure - unused
3026 * @param peer peer identity this notification is about
3027 * @return closure given to #core_disconnects as peer_cls
3028 */
3029void *
3030core_connects (void *cls,
3031 const struct GNUNET_PeerIdentity *peer,
3032 struct GNUNET_MQ_Handle *mq)
3033{
3034 (void) cls;
3035 (void) mq;
3036
3037 GNUNET_CONTAINER_multipeermap_put (map_single_hop, peer, NULL,
3038 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
3039 return NULL;
3040}
3041
3042
3043/**
3044 * @brief Callback for core.
3045 * Method called whenever a peer disconnects.
3046 *
3047 * @param cls closure - unused
3048 * @param peer peer identity this notification is about
3049 * @param peer_cls closure given in #core_connects - unused
3050 */
3051void
3052core_disconnects (void *cls,
3053 const struct GNUNET_PeerIdentity *peer,
3054 void *peer_cls)
3055{
3056 (void) cls;
3057 (void) peer_cls;
3058
3059 GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3060}
3061
3062/***********************************************************************
3063 * /Core handlers
3064***********************************************************************/
3065
3066
3067/**
3068 * @brief Destroy the context for a (connected) client
3069 *
3070 * @param cli_ctx Context to destroy
3071 */
2706static void 3072static void
2707destroy_cli_ctx (struct ClientContext *cli_ctx) 3073destroy_cli_ctx (struct ClientContext *cli_ctx)
2708{ 3074{
@@ -2710,51 +3076,94 @@ destroy_cli_ctx (struct ClientContext *cli_ctx)
2710 GNUNET_CONTAINER_DLL_remove (cli_ctx_head, 3076 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2711 cli_ctx_tail, 3077 cli_ctx_tail,
2712 cli_ctx); 3078 cli_ctx);
3079 if (NULL != cli_ctx->sub)
3080 {
3081 destroy_sub (cli_ctx->sub);
3082 cli_ctx->sub = NULL;
3083 }
2713 GNUNET_free (cli_ctx); 3084 GNUNET_free (cli_ctx);
2714} 3085}
2715 3086
2716 3087
2717/** 3088/**
2718 * Function called by NSE. 3089 * @brief Update sizes in sampler and view on estimate update from nse service
2719 * 3090 *
2720 * Updates sizes of sampler list and view and adapt those lists 3091 * @param sub Sub
2721 * accordingly. 3092 * @param logestimate the log(Base 2) value of the current network size estimate
3093 * @param std_dev standard deviation for the estimate
2722 */ 3094 */
2723static void 3095static void
2724nse_callback (void *cls, 3096adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
2725 struct GNUNET_TIME_Absolute timestamp,
2726 double logestimate, double std_dev)
2727{ 3097{
2728 double estimate; 3098 double estimate;
2729 //double scale; // TODO this might go gloabal/config 3099 //double scale; // TODO this might go gloabal/config
2730 (void) cls;
2731 (void) timestamp;
2732 3100
2733 LOG (GNUNET_ERROR_TYPE_DEBUG, 3101 LOG (GNUNET_ERROR_TYPE_DEBUG,
2734 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n", 3102 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2735 logestimate, std_dev, RPS_sampler_get_size (mss->sampler)); 3103 logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
2736 //scale = .01; 3104 //scale = .01;
2737 estimate = GNUNET_NSE_log_estimate_to_n (logestimate); 3105 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2738 // GNUNET_NSE_log_estimate_to_n (logestimate); 3106 // GNUNET_NSE_log_estimate_to_n (logestimate);
2739 estimate = pow (estimate, 1.0 / 3); 3107 estimate = pow (estimate, 1.0 / 3);
2740 // TODO add if std_dev is a number 3108 // TODO add if std_dev is a number
2741 // estimate += (std_dev * scale); 3109 // estimate += (std_dev * scale);
2742 if (mss->view_size_est_min < ceil (estimate)) 3110 if (sub->view_size_est_min < ceil (estimate))
2743 { 3111 {
2744 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate); 3112 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2745 mss->sampler_size_est_need = estimate; 3113 sub->sampler_size_est_need = estimate;
2746 mss->view_size_est_need = estimate; 3114 sub->view_size_est_need = estimate;
2747 } else 3115 } else
2748 { 3116 {
2749 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); 3117 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2750 //mss->sampler_size_est_need = mss->view_size_est_min; 3118 //sub->sampler_size_est_need = sub->view_size_est_min;
2751 mss->view_size_est_need = mss->view_size_est_min; 3119 sub->view_size_est_need = sub->view_size_est_min;
3120 }
3121 if (sub == msub)
3122 {
3123 GNUNET_STATISTICS_set (stats,
3124 "view size aim",
3125 sub->view_size_est_need,
3126 GNUNET_NO);
2752 } 3127 }
2753 GNUNET_STATISTICS_set (stats, "view size aim", mss->view_size_est_need, GNUNET_NO);
2754 3128
2755 /* If the NSE has changed adapt the lists accordingly */ 3129 /* If the NSE has changed adapt the lists accordingly */
2756 resize_wrapper (mss->sampler, mss->sampler_size_est_need); 3130 resize_wrapper (sub->sampler, sub->sampler_size_est_need);
2757 View_change_len (mss->view, mss->view_size_est_need); 3131 View_change_len (sub->view, sub->view_size_est_need);
3132}
3133
3134
3135/**
3136 * Function called by NSE.
3137 *
3138 * Updates sizes of sampler list and view and adapt those lists
3139 * accordingly.
3140 *
3141 * implements #GNUNET_NSE_Callback
3142 *
3143 * @param cls Closure - unused
3144 * @param timestamp time when the estimate was received from the server (or created by the server)
3145 * @param logestimate the log(Base 2) value of the current network size estimate
3146 * @param std_dev standard deviation for the estimate
3147 */
3148static void
3149nse_callback (void *cls,
3150 struct GNUNET_TIME_Absolute timestamp,
3151 double logestimate, double std_dev)
3152{
3153 (void) cls;
3154 (void) timestamp;
3155 struct ClientContext *cli_ctx_iter;
3156
3157 adapt_sizes (msub, logestimate, std_dev);
3158 for (cli_ctx_iter = cli_ctx_head;
3159 NULL != cli_ctx_iter;
3160 cli_ctx_iter = cli_ctx_iter->next)
3161 {
3162 if (NULL != cli_ctx_iter->sub)
3163 {
3164 adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3165 }
3166 }
2758} 3167}
2759 3168
2760 3169
@@ -2765,6 +3174,7 @@ nse_callback (void *cls,
2765 * @param cls the closure (#ClientContext) 3174 * @param cls the closure (#ClientContext)
2766 * @param msg the message 3175 * @param msg the message
2767 * @return #GNUNET_OK if @a msg is well-formed 3176 * @return #GNUNET_OK if @a msg is well-formed
3177 * #GNUNET_SYSERR otherwise
2768 */ 3178 */
2769static int 3179static int
2770check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg) 3180check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
@@ -2777,6 +3187,10 @@ check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2777 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || 3187 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2778 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) 3188 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2779 { 3189 {
3190 LOG (GNUNET_ERROR_TYPE_ERROR,
3191 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3192 ntohl (msg->num_peers),
3193 (msize / sizeof (struct GNUNET_PeerIdentity)));
2780 GNUNET_break (0); 3194 GNUNET_break (0);
2781 GNUNET_SERVICE_client_drop (cli_ctx->client); 3195 GNUNET_SERVICE_client_drop (cli_ctx->client);
2782 return GNUNET_SYSERR; 3196 return GNUNET_SYSERR;
@@ -2814,7 +3228,8 @@ handle_client_seed (void *cls,
2814 i, 3228 i,
2815 GNUNET_i2s (&peers[i])); 3229 GNUNET_i2s (&peers[i]));
2816 3230
2817 got_peer (&peers[i]); 3231 if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3232 if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
2818 } 3233 }
2819 GNUNET_SERVICE_client_continue (cli_ctx->client); 3234 GNUNET_SERVICE_client_continue (cli_ctx->client);
2820} 3235}
@@ -2824,7 +3239,8 @@ handle_client_seed (void *cls,
2824 * Handle RPS request from the client. 3239 * Handle RPS request from the client.
2825 * 3240 *
2826 * @param cls Client context 3241 * @param cls Client context
2827 * @param message unused 3242 * @param message Message containing the numer of updates the client wants to
3243 * receive
2828 */ 3244 */
2829static void 3245static void
2830handle_client_view_request (void *cls, 3246handle_client_view_request (void *cls,
@@ -2908,15 +3324,63 @@ handle_client_stream_cancel (void *cls,
2908 (void) msg; 3324 (void) msg;
2909 3325
2910 LOG (GNUNET_ERROR_TYPE_DEBUG, 3326 LOG (GNUNET_ERROR_TYPE_DEBUG,
2911 "Client requested peers from biased stream.\n"); 3327 "Client canceled receiving peers from biased stream.\n");
2912 cli_ctx->stream_update = GNUNET_NO; 3328 cli_ctx->stream_update = GNUNET_NO;
2913 3329
2914 GNUNET_assert (NULL != cli_ctx); 3330 GNUNET_assert (NULL != cli_ctx);
2915 GNUNET_SERVICE_client_continue (cli_ctx->client); 3331 GNUNET_SERVICE_client_continue (cli_ctx->client);
2916 if (0 == cli_ctx->view_updates_left) 3332}
3333
3334
3335/**
3336 * @brief Create and start a Sub.
3337 *
3338 * @param cls Closure - unused
3339 * @param msg Message containing the necessary information
3340 */
3341static void
3342handle_client_start_sub (void *cls,
3343 const struct GNUNET_RPS_CS_SubStartMessage *msg)
3344{
3345 struct ClientContext *cli_ctx = cls;
3346
3347 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3348 if (NULL != cli_ctx->sub &&
3349 0 != memcmp (&cli_ctx->sub->hash,
3350 &msg->hash,
3351 sizeof (struct GNUNET_HashCode)))
2917 { 3352 {
2918 destroy_cli_ctx (cli_ctx); 3353 LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3354 destroy_sub (cli_ctx->sub);
3355 cli_ctx->sub = NULL;
3356 }
3357 cli_ctx->sub = new_sub (&msg->hash,
3358 msub->sampler_size_est_min, // TODO make api input?
3359 GNUNET_TIME_relative_ntoh (msg->round_interval));
3360 GNUNET_SERVICE_client_continue (cli_ctx->client);
3361}
3362
3363
3364/**
3365 * @brief Destroy the Sub
3366 *
3367 * @param cls Closure - unused
3368 * @param msg Message containing the hash that identifies the Sub
3369 */
3370static void
3371handle_client_stop_sub (void *cls,
3372 const struct GNUNET_RPS_CS_SubStopMessage *msg)
3373{
3374 struct ClientContext *cli_ctx = cls;
3375
3376 GNUNET_assert (NULL != cli_ctx->sub);
3377 if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3378 {
3379 LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
2919 } 3380 }
3381 destroy_sub (cli_ctx->sub);
3382 cli_ctx->sub = NULL;
3383 GNUNET_SERVICE_client_continue (cli_ctx->client);
2920} 3384}
2921 3385
2922 3386
@@ -2926,8 +3390,8 @@ handle_client_stream_cancel (void *cls,
2926 * This does nothing. But without calling #GNUNET_CADET_receive_done() 3390 * This does nothing. But without calling #GNUNET_CADET_receive_done()
2927 * the channel is blocked for all other communication. 3391 * the channel is blocked for all other communication.
2928 * 3392 *
2929 * @param cls Closure 3393 * @param cls Closure - Context of channel
2930 * @param msg The message header 3394 * @param msg Message - unused
2931 */ 3395 */
2932static void 3396static void
2933handle_peer_check (void *cls, 3397handle_peer_check (void *cls,
@@ -2939,22 +3403,26 @@ handle_peer_check (void *cls,
2939 3403
2940 LOG (GNUNET_ERROR_TYPE_DEBUG, 3404 LOG (GNUNET_ERROR_TYPE_DEBUG,
2941 "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer)); 3405 "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
2942 GNUNET_STATISTICS_update (stats, 3406 if (channel_ctx->peer_ctx->sub == msub)
2943 "# pending online checks", 3407 {
2944 -1, 3408 GNUNET_STATISTICS_update (stats,
2945 GNUNET_NO); 3409 "# pending online checks",
3410 -1,
3411 GNUNET_NO);
3412 }
2946 3413
2947 GNUNET_CADET_receive_done (channel_ctx->channel); 3414 GNUNET_CADET_receive_done (channel_ctx->channel);
2948} 3415}
2949 3416
3417
2950/** 3418/**
2951 * Handle a PUSH message from another peer. 3419 * Handle a PUSH message from another peer.
2952 * 3420 *
2953 * Check the proof of work and store the PeerID 3421 * Check the proof of work and store the PeerID
2954 * in the temporary list for pushed PeerIDs. 3422 * in the temporary list for pushed PeerIDs.
2955 * 3423 *
2956 * @param cls Closure 3424 * @param cls Closure - Context of channel
2957 * @param msg The message header 3425 * @param msg Message - unused
2958 */ 3426 */
2959static void 3427static void
2960handle_peer_push (void *cls, 3428handle_peer_push (void *cls,
@@ -2969,7 +3437,10 @@ handle_peer_push (void *cls,
2969 LOG (GNUNET_ERROR_TYPE_DEBUG, 3437 LOG (GNUNET_ERROR_TYPE_DEBUG,
2970 "Received PUSH (%s)\n", 3438 "Received PUSH (%s)\n",
2971 GNUNET_i2s (peer)); 3439 GNUNET_i2s (peer));
2972 GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO); 3440 if (channel_ctx->peer_ctx->sub == msub)
3441 {
3442 GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3443 }
2973 3444
2974 #ifdef ENABLE_MALICIOUS 3445 #ifdef ENABLE_MALICIOUS
2975 struct AttackedPeer *tmp_att_peer; 3446 struct AttackedPeer *tmp_att_peer;
@@ -2981,9 +3452,8 @@ handle_peer_push (void *cls,
2981 tmp_att_peer->peer_id = *peer; 3452 tmp_att_peer->peer_id = *peer;
2982 if (NULL == att_peer_set) 3453 if (NULL == att_peer_set)
2983 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO); 3454 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
2984 if (GNUNET_NO == 3455 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
2985 GNUNET_CONTAINER_multipeermap_contains (att_peer_set, 3456 peer))
2986 peer))
2987 { 3457 {
2988 GNUNET_CONTAINER_DLL_insert (att_peers_head, 3458 GNUNET_CONTAINER_DLL_insert (att_peers_head,
2989 att_peers_tail, 3459 att_peers_tail,
@@ -3004,9 +3474,10 @@ handle_peer_push (void *cls,
3004 #endif /* ENABLE_MALICIOUS */ 3474 #endif /* ENABLE_MALICIOUS */
3005 3475
3006 /* Add the sending peer to the push_map */ 3476 /* Add the sending peer to the push_map */
3007 CustomPeerMap_put (mss->push_map, peer); 3477 CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3008 3478
3009 GNUNET_break_op (check_peer_known (peer)); 3479 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3480 &channel_ctx->peer_ctx->peer_id));
3010 GNUNET_CADET_receive_done (channel_ctx->channel); 3481 GNUNET_CADET_receive_done (channel_ctx->channel);
3011} 3482}
3012 3483
@@ -3016,41 +3487,59 @@ handle_peer_push (void *cls,
3016 * 3487 *
3017 * Reply with the view of PeerIDs. 3488 * Reply with the view of PeerIDs.
3018 * 3489 *
3019 * @param cls Closure 3490 * @param cls Closure - Context of channel
3020 * @param msg The message header 3491 * @param msg Message - unused
3021 */ 3492 */
3022static void 3493static void
3023handle_peer_pull_request (void *cls, 3494handle_peer_pull_request (void *cls,
3024 const struct GNUNET_MessageHeader *msg) 3495 const struct GNUNET_MessageHeader *msg)
3025{ 3496{
3026 const struct ChannelCtx *channel_ctx = cls; 3497 const struct ChannelCtx *channel_ctx = cls;
3027 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id; 3498 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3499 const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3028 const struct GNUNET_PeerIdentity *view_array; 3500 const struct GNUNET_PeerIdentity *view_array;
3029 (void) msg; 3501 (void) msg;
3030 3502
3031 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer)); 3503 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3032 GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO); 3504 if (peer_ctx->sub == msub)
3505 {
3506 GNUNET_STATISTICS_update(stats,
3507 "# pull request message received",
3508 1,
3509 GNUNET_NO);
3510 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3511 &peer_ctx->peer_id))
3512 {
3513 GNUNET_STATISTICS_update (stats,
3514 "# pull request message received (multi-hop peer)",
3515 1,
3516 GNUNET_NO);
3517 }
3518 }
3033 3519
3034 #ifdef ENABLE_MALICIOUS 3520 #ifdef ENABLE_MALICIOUS
3035 if (1 == mal_type 3521 if (1 == mal_type
3036 || 3 == mal_type) 3522 || 3 == mal_type)
3037 { /* Try to maximise representation */ 3523 { /* Try to maximise representation */
3038 send_pull_reply (peer, mal_peers, num_mal_peers); 3524 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3039 } 3525 }
3040 3526
3041 else if (2 == mal_type) 3527 else if (2 == mal_type)
3042 { /* Try to partition network */ 3528 { /* Try to partition network */
3043 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) 3529 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3044 { 3530 {
3045 send_pull_reply (peer, mal_peers, num_mal_peers); 3531 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3046 } 3532 }
3047 } 3533 }
3048 #endif /* ENABLE_MALICIOUS */ 3534 #endif /* ENABLE_MALICIOUS */
3049 3535
3050 GNUNET_break_op (check_peer_known (peer)); 3536 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3537 &channel_ctx->peer_ctx->peer_id));
3051 GNUNET_CADET_receive_done (channel_ctx->channel); 3538 GNUNET_CADET_receive_done (channel_ctx->channel);
3052 view_array = View_get_as_array (mss->view); 3539 view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3053 send_pull_reply (peer, view_array, View_size (mss->view)); 3540 send_pull_reply (peer_ctx,
3541 view_array,
3542 View_size (channel_ctx->peer_ctx->sub->view));
3054} 3543}
3055 3544
3056 3545
@@ -3058,8 +3547,8 @@ handle_peer_pull_request (void *cls,
3058 * Check whether we sent a corresponding request and 3547 * Check whether we sent a corresponding request and
3059 * whether this reply is the first one. 3548 * whether this reply is the first one.
3060 * 3549 *
3061 * @param cls Closure 3550 * @param cls Closure - Context of channel
3062 * @param msg The message header 3551 * @param msg Message containing the replied peers
3063 */ 3552 */
3064static int 3553static int
3065check_peer_pull_reply (void *cls, 3554check_peer_pull_reply (void *cls,
@@ -3086,22 +3575,27 @@ check_peer_pull_reply (void *cls,
3086 return GNUNET_SYSERR; 3575 return GNUNET_SYSERR;
3087 } 3576 }
3088 3577
3089 if (GNUNET_YES != check_peer_flag (&sender_ctx->peer_id, 3578 if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3579 &sender_ctx->peer_id,
3090 Peers_PULL_REPLY_PENDING)) 3580 Peers_PULL_REPLY_PENDING))
3091 { 3581 {
3092 LOG (GNUNET_ERROR_TYPE_WARNING, 3582 LOG (GNUNET_ERROR_TYPE_WARNING,
3093 "Received a pull reply from a peer (%s) we didn't request one from!\n", 3583 "Received a pull reply from a peer (%s) we didn't request one from!\n",
3094 GNUNET_i2s (&sender_ctx->peer_id)); 3584 GNUNET_i2s (&sender_ctx->peer_id));
3095 GNUNET_STATISTICS_update (stats, 3585 if (sender_ctx->sub == msub)
3096 "# unrequested pull replies", 3586 {
3097 1, 3587 GNUNET_STATISTICS_update (stats,
3098 GNUNET_NO); 3588 "# unrequested pull replies",
3589 1,
3590 GNUNET_NO);
3591 }
3099 GNUNET_break_op (0); 3592 GNUNET_break_op (0);
3100 return GNUNET_SYSERR; 3593 return GNUNET_SYSERR;
3101 } 3594 }
3102 return GNUNET_OK; 3595 return GNUNET_OK;
3103} 3596}
3104 3597
3598
3105/** 3599/**
3106 * Handle PULL REPLY message from another peer. 3600 * Handle PULL REPLY message from another peer.
3107 * 3601 *
@@ -3115,13 +3609,29 @@ handle_peer_pull_reply (void *cls,
3115 const struct ChannelCtx *channel_ctx = cls; 3609 const struct ChannelCtx *channel_ctx = cls;
3116 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id; 3610 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3117 const struct GNUNET_PeerIdentity *peers; 3611 const struct GNUNET_PeerIdentity *peers;
3612 struct Sub *sub = channel_ctx->peer_ctx->sub;
3118 uint32_t i; 3613 uint32_t i;
3119#ifdef ENABLE_MALICIOUS 3614#ifdef ENABLE_MALICIOUS
3120 struct AttackedPeer *tmp_att_peer; 3615 struct AttackedPeer *tmp_att_peer;
3121#endif /* ENABLE_MALICIOUS */ 3616#endif /* ENABLE_MALICIOUS */
3122 3617
3618 sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3123 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender)); 3619 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3124 GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO); 3620 if (channel_ctx->peer_ctx->sub == msub)
3621 {
3622 GNUNET_STATISTICS_update (stats,
3623 "# pull reply messages received",
3624 1,
3625 GNUNET_NO);
3626 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3627 &channel_ctx->peer_ctx->peer_id))
3628 {
3629 GNUNET_STATISTICS_update (stats,
3630 "# pull reply messages received (multi-hop peer)",
3631 1,
3632 GNUNET_NO);
3633 }
3634 }
3125 3635
3126 #ifdef ENABLE_MALICIOUS 3636 #ifdef ENABLE_MALICIOUS
3127 // We shouldn't even receive pull replies as we're not sending 3637 // We shouldn't even receive pull replies as we're not sending
@@ -3165,23 +3675,28 @@ handle_peer_pull_reply (void *cls,
3165 } 3675 }
3166 #endif /* ENABLE_MALICIOUS */ 3676 #endif /* ENABLE_MALICIOUS */
3167 /* Make sure we 'know' about this peer */ 3677 /* Make sure we 'know' about this peer */
3168 (void) insert_peer (&peers[i]); 3678 (void) insert_peer (channel_ctx->peer_ctx->sub, &peers[i]);
3169 3679
3170 if (GNUNET_YES == check_peer_valid (&peers[i])) 3680 if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3681 &peers[i]))
3171 { 3682 {
3172 CustomPeerMap_put (mss->pull_map, &peers[i]); 3683 CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map, &peers[i]);
3173 } 3684 }
3174 else 3685 else
3175 { 3686 {
3176 schedule_operation (&peers[i], insert_in_pull_map); 3687 schedule_operation (channel_ctx->peer_ctx,
3177 (void) issue_peer_online_check (&peers[i]); 3688 insert_in_pull_map,
3689 channel_ctx->peer_ctx->sub); /* cls */
3690 (void) issue_peer_online_check (channel_ctx->peer_ctx->sub, &peers[i]);
3178 } 3691 }
3179 } 3692 }
3180 3693
3181 UNSET_PEER_FLAG (get_peer_ctx (sender), Peers_PULL_REPLY_PENDING); 3694 UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map, sender),
3182 clean_peer (sender); 3695 Peers_PULL_REPLY_PENDING);
3696 clean_peer (channel_ctx->peer_ctx->sub, sender);
3183 3697
3184 GNUNET_break_op (check_peer_known (sender)); 3698 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3699 sender));
3185 GNUNET_CADET_receive_done (channel_ctx->channel); 3700 GNUNET_CADET_receive_done (channel_ctx->channel);
3186} 3701}
3187 3702
@@ -3193,12 +3708,12 @@ handle_peer_pull_reply (void *cls,
3193 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min)) 3708 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3194 * It would return a random value between 2 and 6 min. 3709 * It would return a random value between 2 and 6 min.
3195 * 3710 *
3196 * @param mean the mean 3711 * @param mean the mean time until the next round
3197 * @param spread the inverse amount of deviation from the mean 3712 * @param spread the inverse amount of deviation from the mean
3198 */ 3713 */
3199static struct GNUNET_TIME_Relative 3714static struct GNUNET_TIME_Relative
3200compute_rand_delay (struct GNUNET_TIME_Relative mean, 3715compute_rand_delay (struct GNUNET_TIME_Relative mean,
3201 unsigned int spread) 3716 unsigned int spread)
3202{ 3717{
3203 struct GNUNET_TIME_Relative half_interval; 3718 struct GNUNET_TIME_Relative half_interval;
3204 struct GNUNET_TIME_Relative ret; 3719 struct GNUNET_TIME_Relative ret;
@@ -3238,53 +3753,69 @@ compute_rand_delay (struct GNUNET_TIME_Relative mean,
3238/** 3753/**
3239 * Send single pull request 3754 * Send single pull request
3240 * 3755 *
3241 * @param peer_id the peer to send the pull request to. 3756 * @param peer_ctx Context to the peer to send request to
3242 */ 3757 */
3243static void 3758static void
3244send_pull_request (const struct GNUNET_PeerIdentity *peer) 3759send_pull_request (struct PeerContext *peer_ctx)
3245{ 3760{
3246 struct GNUNET_MQ_Envelope *ev; 3761 struct GNUNET_MQ_Envelope *ev;
3247 3762
3248 GNUNET_assert (GNUNET_NO == check_peer_flag (peer, 3763 GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3249 Peers_PULL_REPLY_PENDING)); 3764 &peer_ctx->peer_id,
3250 SET_PEER_FLAG (get_peer_ctx (peer), Peers_PULL_REPLY_PENDING); 3765 Peers_PULL_REPLY_PENDING));
3766 SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING);
3767 peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3251 3768
3252 LOG (GNUNET_ERROR_TYPE_DEBUG, 3769 LOG (GNUNET_ERROR_TYPE_DEBUG,
3253 "Going to send PULL REQUEST to peer %s.\n", 3770 "Going to send PULL REQUEST to peer %s.\n",
3254 GNUNET_i2s (peer)); 3771 GNUNET_i2s (&peer_ctx->peer_id));
3255 3772
3256 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); 3773 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3257 send_message (peer, ev, "PULL REQUEST"); 3774 send_message (peer_ctx, ev, "PULL REQUEST");
3258 GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO); 3775 if (peer_ctx->sub)
3776 {
3777 GNUNET_STATISTICS_update (stats,
3778 "# pull request send issued",
3779 1,
3780 GNUNET_NO);
3781 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3782 &peer_ctx->peer_id))
3783 {
3784 GNUNET_STATISTICS_update (stats,
3785 "# pull request send issued (multi-hop peer)",
3786 1,
3787 GNUNET_NO);
3788 }
3789 }
3259} 3790}
3260 3791
3261 3792
3262/** 3793/**
3263 * Send single push 3794 * Send single push
3264 * 3795 *
3265 * @param peer_id the peer to send the push to. 3796 * @param peer_ctx Context of peer to send push to
3266 */ 3797 */
3267static void 3798static void
3268send_push (const struct GNUNET_PeerIdentity *peer_id) 3799send_push (struct PeerContext *peer_ctx)
3269{ 3800{
3270 struct GNUNET_MQ_Envelope *ev; 3801 struct GNUNET_MQ_Envelope *ev;
3271 3802
3272 LOG (GNUNET_ERROR_TYPE_DEBUG, 3803 LOG (GNUNET_ERROR_TYPE_DEBUG,
3273 "Going to send PUSH to peer %s.\n", 3804 "Going to send PUSH to peer %s.\n",
3274 GNUNET_i2s (peer_id)); 3805 GNUNET_i2s (&peer_ctx->peer_id));
3275 3806
3276 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); 3807 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3277 send_message (peer_id, ev, "PUSH"); 3808 send_message (peer_ctx, ev, "PUSH");
3278 GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO); 3809 if (peer_ctx->sub)
3810 {
3811 GNUNET_STATISTICS_update (stats,
3812 "# push send issued",
3813 1,
3814 GNUNET_NO);
3815 }
3279} 3816}
3280 3817
3281 3818
3282static void
3283do_round (void *cls);
3284
3285static void
3286do_mal_round (void *cls);
3287
3288#ifdef ENABLE_MALICIOUS 3819#ifdef ENABLE_MALICIOUS
3289 3820
3290 3821
@@ -3334,7 +3865,9 @@ handle_client_act_malicious (void *cls,
3334 struct GNUNET_PeerIdentity *peers; 3865 struct GNUNET_PeerIdentity *peers;
3335 uint32_t num_mal_peers_sent; 3866 uint32_t num_mal_peers_sent;
3336 uint32_t num_mal_peers_old; 3867 uint32_t num_mal_peers_old;
3868 struct Sub *sub = cli_ctx->sub;
3337 3869
3870 if (NULL == sub) sub = msub;
3338 /* Do actual logic */ 3871 /* Do actual logic */
3339 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 3872 peers = (struct GNUNET_PeerIdentity *) &msg[1];
3340 mal_type = ntohl (msg->type); 3873 mal_type = ntohl (msg->type);
@@ -3365,8 +3898,9 @@ handle_client_act_malicious (void *cls,
3365 mal_peer_set); 3898 mal_peer_set);
3366 3899
3367 /* Substitute do_round () with do_mal_round () */ 3900 /* Substitute do_round () with do_mal_round () */
3368 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3901 GNUNET_assert (NULL != sub->do_round_task);
3369 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); 3902 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3903 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3370 } 3904 }
3371 3905
3372 else if ( (2 == mal_type) || 3906 else if ( (2 == mal_type) ||
@@ -3398,9 +3932,9 @@ handle_client_act_malicious (void *cls,
3398 &msg->attacked_peer, 3932 &msg->attacked_peer,
3399 sizeof (struct GNUNET_PeerIdentity)); 3933 sizeof (struct GNUNET_PeerIdentity));
3400 /* Set the flag of the attacked peer to valid to avoid problems */ 3934 /* Set the flag of the attacked peer to valid to avoid problems */
3401 if (GNUNET_NO == check_peer_known (&attacked_peer)) 3935 if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
3402 { 3936 {
3403 (void) issue_peer_online_check (&attacked_peer); 3937 (void) issue_peer_online_check (sub, &attacked_peer);
3404 } 3938 }
3405 3939
3406 LOG (GNUNET_ERROR_TYPE_DEBUG, 3940 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -3408,16 +3942,20 @@ handle_client_act_malicious (void *cls,
3408 GNUNET_i2s (&attacked_peer)); 3942 GNUNET_i2s (&attacked_peer));
3409 3943
3410 /* Substitute do_round () with do_mal_round () */ 3944 /* Substitute do_round () with do_mal_round () */
3411 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3945 if (NULL != sub->do_round_task)
3412 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); 3946 {
3947 /* Probably in shutdown */
3948 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3949 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3950 }
3413 } 3951 }
3414 else if (0 == mal_type) 3952 else if (0 == mal_type)
3415 { /* Stop acting malicious */ 3953 { /* Stop acting malicious */
3416 GNUNET_array_grow (mal_peers, num_mal_peers, 0); 3954 GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3417 3955
3418 /* Substitute do_mal_round () with do_round () */ 3956 /* Substitute do_mal_round () with do_round () */
3419 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3957 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3420 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); 3958 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
3421 } 3959 }
3422 else 3960 else
3423 { 3961 {
@@ -3432,6 +3970,8 @@ handle_client_act_malicious (void *cls,
3432 * Send out PUSHes and PULLs maliciously. 3970 * Send out PUSHes and PULLs maliciously.
3433 * 3971 *
3434 * This is executed regylary. 3972 * This is executed regylary.
3973 *
3974 * @param cls Closure - Sub
3435 */ 3975 */
3436static void 3976static void
3437do_mal_round (void *cls) 3977do_mal_round (void *cls)
@@ -3440,12 +3980,12 @@ do_mal_round (void *cls)
3440 uint32_t i; 3980 uint32_t i;
3441 struct GNUNET_TIME_Relative time_next_round; 3981 struct GNUNET_TIME_Relative time_next_round;
3442 struct AttackedPeer *tmp_att_peer; 3982 struct AttackedPeer *tmp_att_peer;
3443 (void) cls; 3983 struct Sub *sub = cls;
3444 3984
3445 LOG (GNUNET_ERROR_TYPE_DEBUG, 3985 LOG (GNUNET_ERROR_TYPE_DEBUG,
3446 "Going to execute next round maliciously type %" PRIu32 ".\n", 3986 "Going to execute next round maliciously type %" PRIu32 ".\n",
3447 mal_type); 3987 mal_type);
3448 mss->do_round_task = NULL; 3988 sub->do_round_task = NULL;
3449 GNUNET_assert (mal_type <= 3); 3989 GNUNET_assert (mal_type <= 3);
3450 /* Do malicious actions */ 3990 /* Do malicious actions */
3451 if (1 == mal_type) 3991 if (1 == mal_type)
@@ -3468,7 +4008,7 @@ do_mal_round (void *cls)
3468 else 4008 else
3469 att_peer_index = att_peer_index->next; 4009 att_peer_index = att_peer_index->next;
3470 4010
3471 send_push (&att_peer_index->peer_id); 4011 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
3472 } 4012 }
3473 4013
3474 /* Send PULLs to some peers to learn about additional peers to attack */ 4014 /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3480,7 +4020,7 @@ do_mal_round (void *cls)
3480 else 4020 else
3481 att_peer_index = tmp_att_peer->next; 4021 att_peer_index = tmp_att_peer->next;
3482 4022
3483 send_pull_request (&tmp_att_peer->peer_id); 4023 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
3484 } 4024 }
3485 } 4025 }
3486 4026
@@ -3491,9 +4031,11 @@ do_mal_round (void *cls)
3491 * Send as many pushes to the attacked peer as possible 4031 * Send as many pushes to the attacked peer as possible
3492 * That is one push per round as it will ignore more. 4032 * That is one push per round as it will ignore more.
3493 */ 4033 */
3494 (void) issue_peer_online_check (&attacked_peer); 4034 (void) issue_peer_online_check (sub, &attacked_peer);
3495 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE)) 4035 if (GNUNET_YES == check_peer_flag (sub->peer_map,
3496 send_push (&attacked_peer); 4036 &attacked_peer,
4037 Peers_ONLINE))
4038 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
3497 } 4039 }
3498 4040
3499 4041
@@ -3501,18 +4043,20 @@ do_mal_round (void *cls)
3501 { /* Combined attack */ 4043 { /* Combined attack */
3502 4044
3503 /* Send PUSH to attacked peers */ 4045 /* Send PUSH to attacked peers */
3504 if (GNUNET_YES == check_peer_known (&attacked_peer)) 4046 if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
3505 { 4047 {
3506 (void) issue_peer_online_check (&attacked_peer); 4048 (void) issue_peer_online_check (sub, &attacked_peer);
3507 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE)) 4049 if (GNUNET_YES == check_peer_flag (sub->peer_map,
4050 &attacked_peer,
4051 Peers_ONLINE))
3508 { 4052 {
3509 LOG (GNUNET_ERROR_TYPE_DEBUG, 4053 LOG (GNUNET_ERROR_TYPE_DEBUG,
3510 "Goding to send push to attacked peer (%s)\n", 4054 "Goding to send push to attacked peer (%s)\n",
3511 GNUNET_i2s (&attacked_peer)); 4055 GNUNET_i2s (&attacked_peer));
3512 send_push (&attacked_peer); 4056 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
3513 } 4057 }
3514 } 4058 }
3515 (void) issue_peer_online_check (&attacked_peer); 4059 (void) issue_peer_online_check (sub, &attacked_peer);
3516 4060
3517 /* The maximum of pushes we're going to send this round */ 4061 /* The maximum of pushes we're going to send this round */
3518 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1, 4062 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
@@ -3530,7 +4074,7 @@ do_mal_round (void *cls)
3530 else 4074 else
3531 att_peer_index = att_peer_index->next; 4075 att_peer_index = att_peer_index->next;
3532 4076
3533 send_push (&att_peer_index->peer_id); 4077 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
3534 } 4078 }
3535 4079
3536 /* Send PULLs to some peers to learn about additional peers to attack */ 4080 /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3542,26 +4086,27 @@ do_mal_round (void *cls)
3542 else 4086 else
3543 att_peer_index = tmp_att_peer->next; 4087 att_peer_index = tmp_att_peer->next;
3544 4088
3545 send_pull_request (&tmp_att_peer->peer_id); 4089 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
3546 } 4090 }
3547 } 4091 }
3548 4092
3549 /* Schedule next round */ 4093 /* Schedule next round */
3550 time_next_round = compute_rand_delay (mss->round_interval, 2); 4094 time_next_round = compute_rand_delay (sub->round_interval, 2);
3551 4095
3552 //mss->do_round_task = GNUNET_SCHEDULER_add_delayed (mss->round_interval, &do_mal_round, 4096 GNUNET_assert (NULL == sub->do_round_task);
3553 //NULL); 4097 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3554 GNUNET_assert (NULL == mss->do_round_task); 4098 &do_mal_round, sub);
3555 mss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3556 &do_mal_round, NULL);
3557 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 4099 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3558} 4100}
3559#endif /* ENABLE_MALICIOUS */ 4101#endif /* ENABLE_MALICIOUS */
3560 4102
4103
3561/** 4104/**
3562 * Send out PUSHes and PULLs, possibly update #view, samplers. 4105 * Send out PUSHes and PULLs, possibly update #view, samplers.
3563 * 4106 *
3564 * This is executed regylary. 4107 * This is executed regylary.
4108 *
4109 * @param cls Closure - Sub
3565 */ 4110 */
3566static void 4111static void
3567do_round (void *cls) 4112do_round (void *cls)
@@ -3575,64 +4120,70 @@ do_round (void *cls)
3575 uint32_t second_border; 4120 uint32_t second_border;
3576 struct GNUNET_PeerIdentity peer; 4121 struct GNUNET_PeerIdentity peer;
3577 struct GNUNET_PeerIdentity *update_peer; 4122 struct GNUNET_PeerIdentity *update_peer;
3578 (void) cls; 4123 struct Sub *sub = cls;
3579 4124
4125 sub->num_rounds++;
3580 LOG (GNUNET_ERROR_TYPE_DEBUG, 4126 LOG (GNUNET_ERROR_TYPE_DEBUG,
3581 "Going to execute next round.\n"); 4127 "Going to execute next round.\n");
3582 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO); 4128 if (sub == msub)
3583 mss->do_round_task = NULL; 4129 {
4130 GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4131 }
4132 sub->do_round_task = NULL;
3584 LOG (GNUNET_ERROR_TYPE_DEBUG, 4133 LOG (GNUNET_ERROR_TYPE_DEBUG,
3585 "Printing view:\n"); 4134 "Printing view:\n");
3586 to_file (mss->file_name_view_log, 4135 to_file (sub->file_name_view_log,
3587 "___ new round ___"); 4136 "___ new round ___");
3588 view_array = View_get_as_array (mss->view); 4137 view_array = View_get_as_array (sub->view);
3589 for (i = 0; i < View_size (mss->view); i++) 4138 for (i = 0; i < View_size (sub->view); i++)
3590 { 4139 {
3591 LOG (GNUNET_ERROR_TYPE_DEBUG, 4140 LOG (GNUNET_ERROR_TYPE_DEBUG,
3592 "\t%s\n", GNUNET_i2s (&view_array[i])); 4141 "\t%s\n", GNUNET_i2s (&view_array[i]));
3593 to_file (mss->file_name_view_log, 4142 to_file (sub->file_name_view_log,
3594 "=%s\t(do round)", 4143 "=%s\t(do round)",
3595 GNUNET_i2s_full (&view_array[i])); 4144 GNUNET_i2s_full (&view_array[i]));
3596 } 4145 }
3597 4146
3598 4147
3599 /* Send pushes and pull requests */ 4148 /* Send pushes and pull requests */
3600 if (0 < View_size (mss->view)) 4149 if (0 < View_size (sub->view))
3601 { 4150 {
3602 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 4151 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3603 View_size (mss->view)); 4152 View_size (sub->view));
3604 4153
3605 /* Send PUSHes */ 4154 /* Send PUSHes */
3606 a_peers = ceil (alpha * View_size (mss->view)); 4155 a_peers = ceil (alpha * View_size (sub->view));
3607 4156
3608 LOG (GNUNET_ERROR_TYPE_DEBUG, 4157 LOG (GNUNET_ERROR_TYPE_DEBUG,
3609 "Going to send pushes to %u (ceil (%f * %u)) peers.\n", 4158 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3610 a_peers, alpha, View_size (mss->view)); 4159 a_peers, alpha, View_size (sub->view));
3611 for (i = 0; i < a_peers; i++) 4160 for (i = 0; i < a_peers; i++)
3612 { 4161 {
3613 peer = view_array[permut[i]]; 4162 peer = view_array[permut[i]];
3614 // FIXME if this fails schedule/loop this for later 4163 // FIXME if this fails schedule/loop this for later
3615 send_push (&peer); 4164 send_push (get_peer_ctx (sub->peer_map, &peer));
3616 } 4165 }
3617 4166
3618 /* Send PULL requests */ 4167 /* Send PULL requests */
3619 b_peers = ceil (beta * View_size (mss->view)); 4168 b_peers = ceil (beta * View_size (sub->view));
3620 first_border = a_peers; 4169 first_border = a_peers;
3621 second_border = a_peers + b_peers; 4170 second_border = a_peers + b_peers;
3622 if (second_border > View_size (mss->view)) 4171 if (second_border > View_size (sub->view))
3623 { 4172 {
3624 first_border = View_size (mss->view) - b_peers; 4173 first_border = View_size (sub->view) - b_peers;
3625 second_border = View_size (mss->view); 4174 second_border = View_size (sub->view);
3626 } 4175 }
3627 LOG (GNUNET_ERROR_TYPE_DEBUG, 4176 LOG (GNUNET_ERROR_TYPE_DEBUG,
3628 "Going to send pulls to %u (ceil (%f * %u)) peers.\n", 4177 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3629 b_peers, beta, View_size (mss->view)); 4178 b_peers, beta, View_size (sub->view));
3630 for (i = first_border; i < second_border; i++) 4179 for (i = first_border; i < second_border; i++)
3631 { 4180 {
3632 peer = view_array[permut[i]]; 4181 peer = view_array[permut[i]];
3633 if ( GNUNET_NO == check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) 4182 if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4183 &peer,
4184 Peers_PULL_REPLY_PENDING))
3634 { // FIXME if this fails schedule/loop this for later 4185 { // FIXME if this fails schedule/loop this for later
3635 send_pull_request (&peer); 4186 send_pull_request (get_peer_ctx (sub->peer_map, &peer));
3636 } 4187 }
3637 } 4188 }
3638 4189
@@ -3644,10 +4195,9 @@ do_round (void *cls)
3644 /* Update view */ 4195 /* Update view */
3645 /* TODO see how many peers are in push-/pull- list! */ 4196 /* TODO see how many peers are in push-/pull- list! */
3646 4197
3647 if ((CustomPeerMap_size (mss->push_map) <= alpha * mss->view_size_est_need) && 4198 if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
3648 (0 < CustomPeerMap_size (mss->push_map)) && 4199 (0 < CustomPeerMap_size (sub->push_map)) &&
3649 (0 < CustomPeerMap_size (mss->pull_map))) 4200 (0 < CustomPeerMap_size (sub->pull_map)))
3650 //if (GNUNET_YES) // disable blocking temporarily
3651 { /* If conditions for update are fulfilled, update */ 4201 { /* If conditions for update are fulfilled, update */
3652 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n"); 4202 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3653 4203
@@ -3659,23 +4209,23 @@ do_round (void *cls)
3659 peers_to_clean_size = 0; 4209 peers_to_clean_size = 0;
3660 GNUNET_array_grow (peers_to_clean, 4210 GNUNET_array_grow (peers_to_clean,
3661 peers_to_clean_size, 4211 peers_to_clean_size,
3662 View_size (mss->view)); 4212 View_size (sub->view));
3663 GNUNET_memcpy (peers_to_clean, 4213 GNUNET_memcpy (peers_to_clean,
3664 view_array, 4214 view_array,
3665 View_size (mss->view) * sizeof (struct GNUNET_PeerIdentity)); 4215 View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
3666 4216
3667 /* Seems like recreating is the easiest way of emptying the peermap */ 4217 /* Seems like recreating is the easiest way of emptying the peermap */
3668 View_clear (mss->view); 4218 View_clear (sub->view);
3669 to_file (mss->file_name_view_log, 4219 to_file (sub->file_name_view_log,
3670 "--- emptied ---"); 4220 "--- emptied ---");
3671 4221
3672 first_border = GNUNET_MIN (ceil (alpha * mss->view_size_est_need), 4222 first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
3673 CustomPeerMap_size (mss->push_map)); 4223 CustomPeerMap_size (sub->push_map));
3674 second_border = first_border + 4224 second_border = first_border +
3675 GNUNET_MIN (floor (beta * mss->view_size_est_need), 4225 GNUNET_MIN (floor (beta * sub->view_size_est_need),
3676 CustomPeerMap_size (mss->pull_map)); 4226 CustomPeerMap_size (sub->pull_map));
3677 final_size = second_border + 4227 final_size = second_border +
3678 ceil ((1 - (alpha + beta)) * mss->view_size_est_need); 4228 ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
3679 LOG (GNUNET_ERROR_TYPE_DEBUG, 4229 LOG (GNUNET_ERROR_TYPE_DEBUG,
3680 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n", 4230 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3681 first_border, 4231 first_border,
@@ -3684,18 +4234,20 @@ do_round (void *cls)
3684 4234
3685 /* Update view with peers received through PUSHes */ 4235 /* Update view with peers received through PUSHes */
3686 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 4236 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3687 CustomPeerMap_size (mss->push_map)); 4237 CustomPeerMap_size (sub->push_map));
3688 for (i = 0; i < first_border; i++) 4238 for (i = 0; i < first_border; i++)
3689 { 4239 {
3690 int inserted; 4240 int inserted;
3691 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (mss->push_map, 4241 inserted = insert_in_view (sub,
4242 CustomPeerMap_get_peer_by_index (sub->push_map,
3692 permut[i])); 4243 permut[i]));
3693 if (GNUNET_OK == inserted) 4244 if (GNUNET_OK == inserted)
3694 { 4245 {
3695 clients_notify_stream_peer (1, 4246 clients_notify_stream_peer (sub,
3696 CustomPeerMap_get_peer_by_index (mss->push_map, permut[i])); 4247 1,
4248 CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
3697 } 4249 }
3698 to_file (mss->file_name_view_log, 4250 to_file (sub->file_name_view_log,
3699 "+%s\t(push list)", 4251 "+%s\t(push list)",
3700 GNUNET_i2s_full (&view_array[i])); 4252 GNUNET_i2s_full (&view_array[i]));
3701 // TODO change the peer_flags accordingly 4253 // TODO change the peer_flags accordingly
@@ -3705,19 +4257,21 @@ do_round (void *cls)
3705 4257
3706 /* Update view with peers received through PULLs */ 4258 /* Update view with peers received through PULLs */
3707 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 4259 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3708 CustomPeerMap_size (mss->pull_map)); 4260 CustomPeerMap_size (sub->pull_map));
3709 for (i = first_border; i < second_border; i++) 4261 for (i = first_border; i < second_border; i++)
3710 { 4262 {
3711 int inserted; 4263 int inserted;
3712 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (mss->pull_map, 4264 inserted = insert_in_view (sub,
3713 permut[i - first_border])); 4265 CustomPeerMap_get_peer_by_index (sub->pull_map,
4266 permut[i - first_border]));
3714 if (GNUNET_OK == inserted) 4267 if (GNUNET_OK == inserted)
3715 { 4268 {
3716 clients_notify_stream_peer (1, 4269 clients_notify_stream_peer (sub,
3717 CustomPeerMap_get_peer_by_index (mss->pull_map, 4270 1,
4271 CustomPeerMap_get_peer_by_index (sub->pull_map,
3718 permut[i - first_border])); 4272 permut[i - first_border]));
3719 } 4273 }
3720 to_file (mss->file_name_view_log, 4274 to_file (sub->file_name_view_log,
3721 "+%s\t(pull list)", 4275 "+%s\t(pull list)",
3722 GNUNET_i2s_full (&view_array[i])); 4276 GNUNET_i2s_full (&view_array[i]));
3723 // TODO change the peer_flags accordingly 4277 // TODO change the peer_flags accordingly
@@ -3726,106 +4280,116 @@ do_round (void *cls)
3726 permut = NULL; 4280 permut = NULL;
3727 4281
3728 /* Update view with peers from history */ 4282 /* Update view with peers from history */
3729 RPS_sampler_get_n_rand_peers (mss->sampler, 4283 RPS_sampler_get_n_rand_peers (sub->sampler,
3730 final_size - second_border, 4284 final_size - second_border,
3731 hist_update, 4285 hist_update,
3732 NULL); 4286 sub);
3733 // TODO change the peer_flags accordingly 4287 // TODO change the peer_flags accordingly
3734 4288
3735 for (i = 0; i < View_size (mss->view); i++) 4289 for (i = 0; i < View_size (sub->view); i++)
3736 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]); 4290 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3737 4291
3738 /* Clean peers that were removed from the view */ 4292 /* Clean peers that were removed from the view */
3739 for (i = 0; i < peers_to_clean_size; i++) 4293 for (i = 0; i < peers_to_clean_size; i++)
3740 { 4294 {
3741 to_file (mss->file_name_view_log, 4295 to_file (sub->file_name_view_log,
3742 "-%s", 4296 "-%s",
3743 GNUNET_i2s_full (&peers_to_clean[i])); 4297 GNUNET_i2s_full (&peers_to_clean[i]));
3744 clean_peer (&peers_to_clean[i]); 4298 clean_peer (sub, &peers_to_clean[i]);
3745 } 4299 }
3746 4300
3747 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0); 4301 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3748 clients_notify_view_update(); 4302 clients_notify_view_update (sub);
3749 } else { 4303 } else {
3750 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n"); 4304 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3751 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO); 4305 if (sub == msub)
3752 if (CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 4306 {
3753 !(0 >= CustomPeerMap_size (mss->pull_map))) 4307 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3754 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO); 4308 if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
3755 if (CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 4309 !(0 >= CustomPeerMap_size (sub->pull_map)))
3756 (0 >= CustomPeerMap_size (mss->pull_map))) 4310 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3757 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO); 4311 if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
3758 if (0 >= CustomPeerMap_size (mss->push_map) && 4312 (0 >= CustomPeerMap_size (sub->pull_map)))
3759 !(0 >= CustomPeerMap_size (mss->pull_map))) 4313 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3760 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO); 4314 if (0 >= CustomPeerMap_size (sub->push_map) &&
3761 if (0 >= CustomPeerMap_size (mss->push_map) && 4315 !(0 >= CustomPeerMap_size (sub->pull_map)))
3762 (0 >= CustomPeerMap_size (mss->pull_map))) 4316 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3763 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO); 4317 if (0 >= CustomPeerMap_size (sub->push_map) &&
3764 if (0 >= CustomPeerMap_size (mss->pull_map) && 4318 (0 >= CustomPeerMap_size (sub->pull_map)))
3765 CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 4319 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3766 0 >= CustomPeerMap_size (mss->push_map)) 4320 if (0 >= CustomPeerMap_size (sub->pull_map) &&
3767 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO); 4321 CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
4322 0 >= CustomPeerMap_size (sub->push_map))
4323 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4324 }
3768 } 4325 }
3769 // TODO independent of that also get some peers from CADET_get_peers()? 4326 // TODO independent of that also get some peers from CADET_get_peers()?
3770 GNUNET_STATISTICS_set (stats, 4327 sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
3771 "# peers in push map at end of round", 4328 if (sub == msub)
3772 CustomPeerMap_size (mss->push_map), 4329 {
3773 GNUNET_NO); 4330 GNUNET_STATISTICS_set (stats,
3774 GNUNET_STATISTICS_set (stats, 4331 "# peers in push map at end of round",
3775 "# peers in pull map at end of round", 4332 CustomPeerMap_size (sub->push_map),
3776 CustomPeerMap_size (mss->pull_map), 4333 GNUNET_NO);
3777 GNUNET_NO); 4334 GNUNET_STATISTICS_set (stats,
3778 GNUNET_STATISTICS_set (stats, 4335 "# peers in pull map at end of round",
3779 "# peers in view at end of round", 4336 CustomPeerMap_size (sub->pull_map),
3780 View_size (mss->view), 4337 GNUNET_NO);
3781 GNUNET_NO); 4338 GNUNET_STATISTICS_set (stats,
4339 "# peers in view at end of round",
4340 View_size (sub->view),
4341 GNUNET_NO);
4342 }
3782 4343
3783 LOG (GNUNET_ERROR_TYPE_DEBUG, 4344 LOG (GNUNET_ERROR_TYPE_DEBUG,
3784 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (mss->view%u) = %.2f)\n", 4345 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
3785 CustomPeerMap_size (mss->push_map), 4346 CustomPeerMap_size (sub->push_map),
3786 CustomPeerMap_size (mss->pull_map), 4347 CustomPeerMap_size (sub->pull_map),
3787 alpha, 4348 alpha,
3788 View_size (mss->view), 4349 View_size (sub->view),
3789 alpha * View_size (mss->view)); 4350 alpha * View_size (sub->view));
3790 4351
3791 /* Update samplers */ 4352 /* Update samplers */
3792 for (i = 0; i < CustomPeerMap_size (mss->push_map); i++) 4353 for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
3793 { 4354 {
3794 update_peer = CustomPeerMap_get_peer_by_index (mss->push_map, i); 4355 update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
3795 LOG (GNUNET_ERROR_TYPE_DEBUG, 4356 LOG (GNUNET_ERROR_TYPE_DEBUG,
3796 "Updating with peer %s from push list\n", 4357 "Updating with peer %s from push list\n",
3797 GNUNET_i2s (update_peer)); 4358 GNUNET_i2s (update_peer));
3798 insert_in_sampler (NULL, update_peer); 4359 insert_in_sampler (sub, update_peer);
3799 clean_peer (update_peer); /* This cleans only if it is not in the view */ 4360 clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
3800 } 4361 }
3801 4362
3802 for (i = 0; i < CustomPeerMap_size (mss->pull_map); i++) 4363 for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
3803 { 4364 {
3804 LOG (GNUNET_ERROR_TYPE_DEBUG, 4365 LOG (GNUNET_ERROR_TYPE_DEBUG,
3805 "Updating with peer %s from pull list\n", 4366 "Updating with peer %s from pull list\n",
3806 GNUNET_i2s (CustomPeerMap_get_peer_by_index (mss->pull_map, i))); 4367 GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
3807 insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (mss->pull_map, i)); 4368 insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
3808 /* This cleans only if it is not in the view */ 4369 /* This cleans only if it is not in the view */
3809 clean_peer (CustomPeerMap_get_peer_by_index (mss->pull_map, i)); 4370 clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
3810 } 4371 }
3811 4372
3812 4373
3813 /* Empty push/pull lists */ 4374 /* Empty push/pull lists */
3814 CustomPeerMap_clear (mss->push_map); 4375 CustomPeerMap_clear (sub->push_map);
3815 CustomPeerMap_clear (mss->pull_map); 4376 CustomPeerMap_clear (sub->pull_map);
3816 4377
3817 GNUNET_STATISTICS_set (stats, 4378 if (sub == msub)
3818 "view size", 4379 {
3819 View_size(mss->view), 4380 GNUNET_STATISTICS_set (stats,
3820 GNUNET_NO); 4381 "view size",
4382 View_size(sub->view),
4383 GNUNET_NO);
4384 }
3821 4385
3822 struct GNUNET_TIME_Relative time_next_round; 4386 struct GNUNET_TIME_Relative time_next_round;
3823 4387
3824 time_next_round = compute_rand_delay (mss->round_interval, 2); 4388 time_next_round = compute_rand_delay (sub->round_interval, 2);
3825 4389
3826 /* Schedule next round */ 4390 /* Schedule next round */
3827 mss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, 4391 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3828 &do_round, NULL); 4392 &do_round, sub);
3829 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 4393 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3830} 4394}
3831 4395
@@ -3835,16 +4399,25 @@ do_round (void *cls)
3835 * 4399 *
3836 * It is called on every peer(ID) that cadet somehow has contact with. 4400 * It is called on every peer(ID) that cadet somehow has contact with.
3837 * We use those to initialise the sampler. 4401 * We use those to initialise the sampler.
4402 *
4403 * implements #GNUNET_CADET_PeersCB
4404 *
4405 * @param cls Closure - Sub
4406 * @param peer Peer, or NULL on "EOF".
4407 * @param tunnel Do we have a tunnel towards this peer?
4408 * @param n_paths Number of known paths towards this peer.
4409 * @param best_path How long is the best path?
4410 * (0 = unknown, 1 = ourselves, 2 = neighbor)
3838 */ 4411 */
3839void 4412void
3840init_peer_cb (void *cls, 4413init_peer_cb (void *cls,
3841 const struct GNUNET_PeerIdentity *peer, 4414 const struct GNUNET_PeerIdentity *peer,
3842 int tunnel, // "Do we have a tunnel towards this peer?" 4415 int tunnel, /* "Do we have a tunnel towards this peer?" */
3843 unsigned int n_paths, // "Number of known paths towards this peer" 4416 unsigned int n_paths, /* "Number of known paths towards this peer" */
3844 unsigned int best_path) // "How long is the best path? 4417 unsigned int best_path) /* "How long is the best path?
3845 // (0 = unknown, 1 = ourselves, 2 = neighbor)" 4418 * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
3846{ 4419{
3847 (void) cls; 4420 struct Sub *sub = cls;
3848 (void) tunnel; 4421 (void) tunnel;
3849 (void) n_paths; 4422 (void) n_paths;
3850 (void) best_path; 4423 (void) best_path;
@@ -3854,16 +4427,17 @@ init_peer_cb (void *cls,
3854 LOG (GNUNET_ERROR_TYPE_DEBUG, 4427 LOG (GNUNET_ERROR_TYPE_DEBUG,
3855 "Got peer_id %s from cadet\n", 4428 "Got peer_id %s from cadet\n",
3856 GNUNET_i2s (peer)); 4429 GNUNET_i2s (peer));
3857 got_peer (peer); 4430 got_peer (sub, peer);
3858 } 4431 }
3859} 4432}
3860 4433
4434
3861/** 4435/**
3862 * @brief Iterator function over stored, valid peers. 4436 * @brief Iterator function over stored, valid peers.
3863 * 4437 *
3864 * We initialise the sampler with those. 4438 * We initialise the sampler with those.
3865 * 4439 *
3866 * @param cls the closure 4440 * @param cls Closure - Sub
3867 * @param peer the peer id 4441 * @param peer the peer id
3868 * @return #GNUNET_YES if we should continue to 4442 * @return #GNUNET_YES if we should continue to
3869 * iterate, 4443 * iterate,
@@ -3873,14 +4447,14 @@ static int
3873valid_peers_iterator (void *cls, 4447valid_peers_iterator (void *cls,
3874 const struct GNUNET_PeerIdentity *peer) 4448 const struct GNUNET_PeerIdentity *peer)
3875{ 4449{
3876 (void) cls; 4450 struct Sub *sub = cls;
3877 4451
3878 if (NULL != peer) 4452 if (NULL != peer)
3879 { 4453 {
3880 LOG (GNUNET_ERROR_TYPE_DEBUG, 4454 LOG (GNUNET_ERROR_TYPE_DEBUG,
3881 "Got stored, valid peer %s\n", 4455 "Got stored, valid peer %s\n",
3882 GNUNET_i2s (peer)); 4456 GNUNET_i2s (peer));
3883 got_peer (peer); 4457 got_peer (sub, peer);
3884 } 4458 }
3885 return GNUNET_YES; 4459 return GNUNET_YES;
3886} 4460}
@@ -3889,7 +4463,7 @@ valid_peers_iterator (void *cls,
3889/** 4463/**
3890 * Iterator over peers from peerinfo. 4464 * Iterator over peers from peerinfo.
3891 * 4465 *
3892 * @param cls closure 4466 * @param cls Closure - Sub
3893 * @param peer id of the peer, NULL for last call 4467 * @param peer id of the peer, NULL for last call
3894 * @param hello hello message for the peer (can be NULL) 4468 * @param hello hello message for the peer (can be NULL)
3895 * @param error message 4469 * @param error message
@@ -3900,7 +4474,7 @@ process_peerinfo_peers (void *cls,
3900 const struct GNUNET_HELLO_Message *hello, 4474 const struct GNUNET_HELLO_Message *hello,
3901 const char *err_msg) 4475 const char *err_msg)
3902{ 4476{
3903 (void) cls; 4477 struct Sub *sub = cls;
3904 (void) hello; 4478 (void) hello;
3905 (void) err_msg; 4479 (void) err_msg;
3906 4480
@@ -3909,7 +4483,7 @@ process_peerinfo_peers (void *cls,
3909 LOG (GNUNET_ERROR_TYPE_DEBUG, 4483 LOG (GNUNET_ERROR_TYPE_DEBUG,
3910 "Got peer_id %s from peerinfo\n", 4484 "Got peer_id %s from peerinfo\n",
3911 GNUNET_i2s (peer)); 4485 GNUNET_i2s (peer));
3912 got_peer (peer); 4486 got_peer (sub, peer);
3913 } 4487 }
3914} 4488}
3915 4489
@@ -3917,13 +4491,13 @@ process_peerinfo_peers (void *cls,
3917/** 4491/**
3918 * Task run during shutdown. 4492 * Task run during shutdown.
3919 * 4493 *
3920 * @param cls unused 4494 * @param cls Closure - unused
3921 */ 4495 */
3922static void 4496static void
3923shutdown_task (void *cls) 4497shutdown_task (void *cls)
3924{ 4498{
3925 struct ClientContext *client_ctx;
3926 (void) cls; 4499 (void) cls;
4500 struct ClientContext *client_ctx;
3927 4501
3928 LOG (GNUNET_ERROR_TYPE_DEBUG, 4502 LOG (GNUNET_ERROR_TYPE_DEBUG,
3929 "RPS service is going down\n"); 4503 "RPS service is going down\n");
@@ -3935,42 +4509,40 @@ shutdown_task (void *cls)
3935 { 4509 {
3936 destroy_cli_ctx (client_ctx); 4510 destroy_cli_ctx (client_ctx);
3937 } 4511 }
4512 if (NULL != msub)
4513 {
4514 destroy_sub (msub);
4515 msub = NULL;
4516 }
4517
4518 /* Disconnect from other services */
3938 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle); 4519 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
3939 GNUNET_PEERINFO_disconnect (peerinfo_handle); 4520 GNUNET_PEERINFO_disconnect (peerinfo_handle);
3940 peerinfo_handle = NULL; 4521 peerinfo_handle = NULL;
3941 if (NULL != mss->do_round_task) 4522 GNUNET_NSE_disconnect (nse);
4523 if (NULL != map_single_hop)
3942 { 4524 {
3943 GNUNET_SCHEDULER_cancel (mss->do_round_task); 4525 /* core_init was called - core was initialised */
3944 mss->do_round_task = NULL; 4526 /* disconnect first, so no callback tries to access missing peermap */
4527 GNUNET_CORE_disconnect (core_handle);
4528 core_handle = NULL;
4529 GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4530 map_single_hop = NULL;
3945 } 4531 }
3946 4532
3947 peers_terminate ();
3948
3949 GNUNET_NSE_disconnect (nse);
3950 RPS_sampler_destroy (mss->sampler);
3951 GNUNET_CADET_close_port (mss->cadet_port);
3952 GNUNET_CADET_disconnect (mss->cadet_handle);
3953 mss->cadet_handle = NULL;
3954 View_destroy (mss->view);
3955 CustomPeerMap_destroy (mss->push_map);
3956 CustomPeerMap_destroy (mss->pull_map);
3957 if (NULL != stats) 4533 if (NULL != stats)
3958 { 4534 {
3959 GNUNET_STATISTICS_destroy (stats, 4535 GNUNET_STATISTICS_destroy (stats,
3960 GNUNET_NO); 4536 GNUNET_NO);
3961 stats = NULL; 4537 stats = NULL;
3962 } 4538 }
4539 GNUNET_CADET_disconnect (cadet_handle);
4540 cadet_handle = NULL;
3963#ifdef ENABLE_MALICIOUS 4541#ifdef ENABLE_MALICIOUS
3964 struct AttackedPeer *tmp_att_peer; 4542 struct AttackedPeer *tmp_att_peer;
3965 /* it is ok to free this const during shutdown: */
3966 GNUNET_free ((char *) mss->file_name_view_log);
3967#ifdef TO_FILE
3968 GNUNET_free ((char *) mss->file_name_observed_log);
3969 GNUNET_CONTAINER_multipeermap_destroy (mss->observed_unique_peers);
3970#endif /* TO_FILE */
3971 GNUNET_array_grow (mal_peers, 4543 GNUNET_array_grow (mal_peers,
3972 num_mal_peers, 4544 num_mal_peers,
3973 0); 4545 0);
3974 if (NULL != mal_peer_set) 4546 if (NULL != mal_peer_set)
3975 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set); 4547 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
3976 if (NULL != att_peer_set) 4548 if (NULL != att_peer_set)
@@ -3979,8 +4551,8 @@ shutdown_task (void *cls)
3979 { 4551 {
3980 tmp_att_peer = att_peers_head; 4552 tmp_att_peer = att_peers_head;
3981 GNUNET_CONTAINER_DLL_remove (att_peers_head, 4553 GNUNET_CONTAINER_DLL_remove (att_peers_head,
3982 att_peers_tail, 4554 att_peers_tail,
3983 tmp_att_peer); 4555 tmp_att_peer);
3984 GNUNET_free (tmp_att_peer); 4556 GNUNET_free (tmp_att_peer);
3985 } 4557 }
3986#endif /* ENABLE_MALICIOUS */ 4558#endif /* ENABLE_MALICIOUS */
@@ -3990,7 +4562,7 @@ shutdown_task (void *cls)
3990/** 4562/**
3991 * Handle client connecting to the service. 4563 * Handle client connecting to the service.
3992 * 4564 *
3993 * @param cls NULL 4565 * @param cls unused
3994 * @param client the new client 4566 * @param client the new client
3995 * @param mq the message queue of @a client 4567 * @param mq the message queue of @a client
3996 * @return @a client 4568 * @return @a client
@@ -4060,20 +4632,21 @@ run (void *cls,
4060 const struct GNUNET_CONFIGURATION_Handle *c, 4632 const struct GNUNET_CONFIGURATION_Handle *c,
4061 struct GNUNET_SERVICE_Handle *service) 4633 struct GNUNET_SERVICE_Handle *service)
4062{ 4634{
4063 char *fn_valid_peers;
4064 struct GNUNET_TIME_Relative round_interval; 4635 struct GNUNET_TIME_Relative round_interval;
4065 long long unsigned int sampler_size; 4636 long long unsigned int sampler_size;
4637 char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4638 struct GNUNET_HashCode hash;
4066 4639
4067 (void) cls; 4640 (void) cls;
4068 (void) service; 4641 (void) service;
4069 4642
4070 GNUNET_log_setup ("rps", 4643 GNUNET_log_setup ("rps",
4071 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), 4644 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4072 NULL); 4645 NULL);
4073 cfg = c; 4646 cfg = c;
4074 /* Get own ID */ 4647 /* Get own ID */
4075 GNUNET_CRYPTO_get_peer_identity (cfg, 4648 GNUNET_CRYPTO_get_peer_identity (cfg,
4076 &own_identity); // TODO check return value 4649 &own_identity); // TODO check return value
4077 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 4650 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4078 "STARTING SERVICE (rps) for peer [%s]\n", 4651 "STARTING SERVICE (rps) for peer [%s]\n",
4079 GNUNET_i2s (&own_identity)); 4652 GNUNET_i2s (&own_identity));
@@ -4085,9 +4658,9 @@ run (void *cls,
4085 /* Get time interval from the configuration */ 4658 /* Get time interval from the configuration */
4086 if (GNUNET_OK != 4659 if (GNUNET_OK !=
4087 GNUNET_CONFIGURATION_get_value_time (cfg, 4660 GNUNET_CONFIGURATION_get_value_time (cfg,
4088 "RPS", 4661 "RPS",
4089 "ROUNDINTERVAL", 4662 "ROUNDINTERVAL",
4090 &round_interval)) 4663 &round_interval))
4091 { 4664 {
4092 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 4665 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4093 "RPS", "ROUNDINTERVAL"); 4666 "RPS", "ROUNDINTERVAL");
@@ -4108,54 +4681,51 @@ run (void *cls,
4108 return; 4681 return;
4109 } 4682 }
4110 4683
4111 if (GNUNET_OK != 4684 cadet_handle = GNUNET_CADET_connect (cfg);
4112 GNUNET_CONFIGURATION_get_value_filename (cfg, 4685 GNUNET_assert (NULL != cadet_handle);
4113 "rps", 4686 core_handle = GNUNET_CORE_connect (cfg,
4114 "FILENAME_VALID_PEERS", 4687 NULL, /* cls */
4115 &fn_valid_peers)) 4688 core_init, /* init */
4116 { 4689 core_connects, /* connects */
4117 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 4690 core_disconnects, /* disconnects */
4118 "rps", "FILENAME_VALID_PEERS"); 4691 NULL); /* handlers */
4119 } 4692 GNUNET_assert (NULL != core_handle);
4120
4121
4122 /* connect to NSE */
4123 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4124 4693
4125 4694
4126 alpha = 0.45; 4695 alpha = 0.45;
4127 beta = 0.45; 4696 beta = 0.45;
4128 4697
4129 4698
4130 /* Set up main SubSampler */ 4699 /* Set up main Sub */
4131 mss = new_subsampler ("", /* this is the main sampler - no shared value */ 4700 GNUNET_CRYPTO_hash (hash_port_string,
4132 sampler_size, /* Will be overwritten by config */ 4701 strlen (hash_port_string),
4133 round_interval); 4702 &hash);
4703 msub = new_sub (&hash,
4704 sampler_size, /* Will be overwritten by config */
4705 round_interval);
4134 4706
4135 4707
4136 peerinfo_handle = GNUNET_PEERINFO_connect (cfg); 4708 peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4137 4709
4710 /* connect to NSE */
4711 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4712
4138 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); 4713 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4139 //GNUNET_CADET_get_peers (mss.cadet_handle, &init_peer_cb, NULL); 4714 //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4140 // TODO send push/pull to each of those peers? 4715 // TODO send push/pull to each of those peers?
4141 // TODO read stored valid peers from last run
4142 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n"); 4716 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4143 restore_valid_peers (); 4717 restore_valid_peers (msub);
4144 get_valid_peers (valid_peers_iterator, NULL); 4718 get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4145 4719
4146 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg, 4720 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4147 GNUNET_NO, 4721 GNUNET_NO,
4148 process_peerinfo_peers, 4722 process_peerinfo_peers,
4149 NULL); 4723 msub);
4150 4724
4151 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n"); 4725 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4152 4726
4153 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4154 LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4155
4156 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); 4727 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4157 stats = GNUNET_STATISTICS_create ("rps", cfg); 4728 stats = GNUNET_STATISTICS_create ("rps", cfg);
4158
4159} 4729}
4160 4730
4161 4731
@@ -4195,6 +4765,14 @@ GNUNET_SERVICE_MAIN
4195 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL, 4765 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4196 struct GNUNET_MessageHeader, 4766 struct GNUNET_MessageHeader,
4197 NULL), 4767 NULL),
4768 GNUNET_MQ_hd_fixed_size (client_start_sub,
4769 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4770 struct GNUNET_RPS_CS_SubStartMessage,
4771 NULL),
4772 GNUNET_MQ_hd_fixed_size (client_stop_sub,
4773 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4774 struct GNUNET_RPS_CS_SubStopMessage,
4775 NULL),
4198 GNUNET_MQ_handler_end()); 4776 GNUNET_MQ_handler_end());
4199 4777
4200/* end of gnunet-service-rps.c */ 4778/* end of gnunet-service-rps.c */