aboutsummaryrefslogtreecommitdiff
path: root/src/rps/gnunet-service-rps.c
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2018-10-04 10:25:52 +0200
committerJulius Bünger <buenger@mytum.de>2018-10-04 10:25:52 +0200
commited07d0c162984ddedc9399744e7736a101d3df2b (patch)
treeca35cd17728bed47230e0cf8804a77bbb533aca5 /src/rps/gnunet-service-rps.c
parent4ac7b35eb3126485ac69c6d930c18a78b7b9a860 (diff)
downloadgnunet-ed07d0c162984ddedc9399744e7736a101d3df2b.tar.gz
gnunet-ed07d0c162984ddedc9399744e7736a101d3df2b.zip
Restructure service towards having subsamplers
Diffstat (limited to 'src/rps/gnunet-service-rps.c')
-rw-r--r--src/rps/gnunet-service-rps.c1025
1 files changed, 583 insertions, 442 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 07b88ddcd..d1c169239 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -149,6 +149,11 @@ struct ChannelCtx;
149struct PeerContext 149struct PeerContext
150{ 150{
151 /** 151 /**
152 * The SubSampler this context belongs to.
153 */
154 struct SubSampler *ss;
155
156 /**
152 * Message queue open to client 157 * Message queue open to client
153 */ 158 */
154 struct GNUNET_MQ_Handle *mq; 159 struct GNUNET_MQ_Handle *mq;
@@ -533,19 +538,21 @@ static const uint32_t num_valid_peers_max = UINT32_MAX;
533/** 538/**
534 * @brief Get the #PeerContext associated with a peer 539 * @brief Get the #PeerContext associated with a peer
535 * 540 *
541 * @param peer_map The peer map containing the context
536 * @param peer the peer id 542 * @param peer the peer id
537 * 543 *
538 * @return the #PeerContext 544 * @return the #PeerContext
539 */ 545 */
540static struct PeerContext * 546static struct PeerContext *
541get_peer_ctx (const struct GNUNET_PeerIdentity *peer) 547get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
548 const struct GNUNET_PeerIdentity *peer)
542{ 549{
543 struct PeerContext *ctx; 550 struct PeerContext *ctx;
544 int ret; 551 int ret;
545 552
546 ret = GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer); 553 ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
547 GNUNET_assert (GNUNET_YES == ret); 554 GNUNET_assert (GNUNET_YES == ret);
548 ctx = GNUNET_CONTAINER_multipeermap_get (mss->peer_map, peer); 555 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
549 GNUNET_assert (NULL != ctx); 556 GNUNET_assert (NULL != ctx);
550 return ctx; 557 return ctx;
551} 558}
@@ -555,18 +562,21 @@ get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
555 * 562 *
556 * FIXME probably deprecated. Make this the new _online. 563 * FIXME probably deprecated. Make this the new _online.
557 * 564 *
565 * @param peer_map The peer map to check for the existence of @a peer
558 * @param peer peer in question 566 * @param peer peer in question
559 * 567 *
560 * @return #GNUNET_YES if peer is known 568 * @return #GNUNET_YES if peer is known
561 * #GNUNET_NO if peer is not knwon 569 * #GNUNET_NO if peer is not knwon
562 */ 570 */
563static int 571static int
564check_peer_known (const struct GNUNET_PeerIdentity *peer) 572check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
573 const struct GNUNET_PeerIdentity *peer)
565{ 574{
566 if (NULL != mss->peer_map) 575 if (NULL != peer_map)
567 { 576 {
568 return GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer); 577 return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
569 } else 578 }
579 else
570 { 580 {
571 return GNUNET_NO; 581 return GNUNET_NO;
572 } 582 }
@@ -576,26 +586,29 @@ check_peer_known (const struct GNUNET_PeerIdentity *peer)
576/** 586/**
577 * @brief Create a new #PeerContext and insert it into the peer map 587 * @brief Create a new #PeerContext and insert it into the peer map
578 * 588 *
589 * @param ss The SubSampler this context belongs to.
579 * @param peer the peer to create the #PeerContext for 590 * @param peer the peer to create the #PeerContext for
580 * 591 *
581 * @return the #PeerContext 592 * @return the #PeerContext
582 */ 593 */
583static struct PeerContext * 594static struct PeerContext *
584create_peer_ctx (const struct GNUNET_PeerIdentity *peer) 595create_peer_ctx (struct SubSampler *ss,
596 const struct GNUNET_PeerIdentity *peer)
585{ 597{
586 struct PeerContext *ctx; 598 struct PeerContext *ctx;
587 int ret; 599 int ret;
588 600
589 GNUNET_assert (GNUNET_NO == check_peer_known (peer)); 601 GNUNET_assert (GNUNET_NO == check_peer_known (ss->peer_map, peer));
590 602
591 ctx = GNUNET_new (struct PeerContext); 603 ctx = GNUNET_new (struct PeerContext);
592 ctx->peer_id = *peer; 604 ctx->peer_id = *peer;
593 ret = GNUNET_CONTAINER_multipeermap_put (mss->peer_map, peer, ctx, 605 ctx->ss = ss;
606 ret = GNUNET_CONTAINER_multipeermap_put (ss->peer_map, peer, ctx,
594 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 607 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
595 GNUNET_assert (GNUNET_OK == ret); 608 GNUNET_assert (GNUNET_OK == ret);
596 GNUNET_STATISTICS_set (stats, 609 GNUNET_STATISTICS_set (stats,
597 "# known peers", 610 "# known peers",
598 GNUNET_CONTAINER_multipeermap_size (mss->peer_map), 611 GNUNET_CONTAINER_multipeermap_size (ss->peer_map),
599 GNUNET_NO); 612 GNUNET_NO);
600 return ctx; 613 return ctx;
601} 614}
@@ -604,18 +617,20 @@ create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
604/** 617/**
605 * @brief Create or get a #PeerContext 618 * @brief Create or get a #PeerContext
606 * 619 *
620 * @param ss The SubSampler to which the created context belongs to
607 * @param peer the peer to get the associated context to 621 * @param peer the peer to get the associated context to
608 * 622 *
609 * @return the context 623 * @return the context
610 */ 624 */
611static struct PeerContext * 625static struct PeerContext *
612create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer) 626create_or_get_peer_ctx (struct SubSampler *ss,
627 const struct GNUNET_PeerIdentity *peer)
613{ 628{
614 if (GNUNET_NO == check_peer_known (peer)) 629 if (GNUNET_NO == check_peer_known (ss->peer_map, peer))
615 { 630 {
616 return create_peer_ctx (peer); 631 return create_peer_ctx (ss, peer);
617 } 632 }
618 return get_peer_ctx (peer); 633 return get_peer_ctx (ss->peer_map, peer);
619} 634}
620 635
621 636
@@ -624,23 +639,22 @@ create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
624 * 639 *
625 * Also sets the #Peers_ONLINE flag accordingly 640 * Also sets the #Peers_ONLINE flag accordingly
626 * 641 *
627 * @param peer the peer in question 642 * @param peer_ctx Context of the peer of which connectivity is to be checked
628 * 643 *
629 * @return #GNUNET_YES if we are connected 644 * @return #GNUNET_YES if we are connected
630 * #GNUNET_NO otherwise 645 * #GNUNET_NO otherwise
631 */ 646 */
632static int 647static int
633check_connected (const struct GNUNET_PeerIdentity *peer) 648check_connected (struct PeerContext *peer_ctx)
634{ 649{
635 struct PeerContext *peer_ctx;
636
637 /* If we don't know about this peer we don't know whether it's online */ 650 /* If we don't know about this peer we don't know whether it's online */
638 if (GNUNET_NO == check_peer_known (peer)) 651 if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map,
652 &peer_ctx->peer_id))
639 { 653 {
640 return GNUNET_NO; 654 return GNUNET_NO;
641 } 655 }
642 /* Get the context */ 656 /* Get the context */
643 peer_ctx = get_peer_ctx (peer); 657 peer_ctx = get_peer_ctx (peer_ctx->ss->peer_map, &peer_ctx->peer_id);
644 /* If we have no channel to this peer we don't know whether it's online */ 658 /* If we have no channel to this peer we don't know whether it's online */
645 if ( (NULL == peer_ctx->send_channel_ctx) && 659 if ( (NULL == peer_ctx->send_channel_ctx) &&
646 (NULL == peer_ctx->recv_channel_ctx) ) 660 (NULL == peer_ctx->recv_channel_ctx) )
@@ -709,21 +723,22 @@ get_rand_peer_iterator (void *cls,
709/** 723/**
710 * @brief Get a random peer from @a peer_map 724 * @brief Get a random peer from @a peer_map
711 * 725 *
712 * @param peer_map the peer_map to get the peer from 726 * @param valid_peers Peer map containing valid peers from which to select a
727 * random one
713 * 728 *
714 * @return a random peer 729 * @return a random peer
715 */ 730 */
716static const struct GNUNET_PeerIdentity * 731static const struct GNUNET_PeerIdentity *
717get_random_peer_from_peermap (const struct 732get_random_peer_from_peermap (const struct
718 GNUNET_CONTAINER_MultiPeerMap *peer_map) 733 GNUNET_CONTAINER_MultiPeerMap *valid_peers)
719{ 734{
720 struct GetRandPeerIteratorCls *iterator_cls; 735 struct GetRandPeerIteratorCls *iterator_cls;
721 const struct GNUNET_PeerIdentity *ret; 736 const struct GNUNET_PeerIdentity *ret;
722 737
723 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls); 738 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
724 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 739 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
725 GNUNET_CONTAINER_multipeermap_size (peer_map)); 740 GNUNET_CONTAINER_multipeermap_size (valid_peers));
726 (void) GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 741 (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
727 get_rand_peer_iterator, 742 get_rand_peer_iterator,
728 iterator_cls); 743 iterator_cls);
729 ret = iterator_cls->peer; 744 ret = iterator_cls->peer;
@@ -737,30 +752,33 @@ get_random_peer_from_peermap (const struct
737 * 752 *
738 * If valid peers are already #num_valid_peers_max, delete a peer previously. 753 * If valid peers are already #num_valid_peers_max, delete a peer previously.
739 * 754 *
740 * @param peer the peer that is added to the valid peers. 755 * @param peer The peer that is added to the valid peers.
756 * @param valid_peers Peer map of valid peers to which to add the @a peer
741 * 757 *
742 * @return #GNUNET_YES if no other peer had to be removed 758 * @return #GNUNET_YES if no other peer had to be removed
743 * #GNUNET_NO otherwise 759 * #GNUNET_NO otherwise
744 */ 760 */
745static int 761static int
746add_valid_peer (const struct GNUNET_PeerIdentity *peer) 762add_valid_peer (const struct GNUNET_PeerIdentity *peer,
763 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
747{ 764{
748 const struct GNUNET_PeerIdentity *rand_peer; 765 const struct GNUNET_PeerIdentity *rand_peer;
749 int ret; 766 int ret;
750 767
751 ret = GNUNET_YES; 768 ret = GNUNET_YES;
752 while (GNUNET_CONTAINER_multipeermap_size ( 769 /* Remove random peers until there is space for a new one */
753 mss->valid_peers) >= num_valid_peers_max) 770 while (num_valid_peers_max <=
771 GNUNET_CONTAINER_multipeermap_size (valid_peers))
754 { 772 {
755 rand_peer = get_random_peer_from_peermap (mss->valid_peers); 773 rand_peer = get_random_peer_from_peermap (valid_peers);
756 GNUNET_CONTAINER_multipeermap_remove_all (mss->valid_peers, rand_peer); 774 GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
757 ret = GNUNET_NO; 775 ret = GNUNET_NO;
758 } 776 }
759 (void) GNUNET_CONTAINER_multipeermap_put (mss->valid_peers, peer, NULL, 777 (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
760 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 778 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
761 GNUNET_STATISTICS_set (stats, 779 GNUNET_STATISTICS_set (stats,
762 "# valid peers", 780 "# valid peers",
763 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers), 781 GNUNET_CONTAINER_multipeermap_size (valid_peers),
764 GNUNET_NO); 782 GNUNET_NO);
765 return ret; 783 return ret;
766} 784}
@@ -799,7 +817,6 @@ set_peer_online (struct PeerContext *peer_ctx)
799 peer_ctx->online_check_pending = NULL; 817 peer_ctx->online_check_pending = NULL;
800 } 818 }
801 819
802 (void) add_valid_peer (peer);
803 SET_PEER_FLAG (peer_ctx, Peers_ONLINE); 820 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
804 821
805 /* Call pending operations */ 822 /* Call pending operations */
@@ -888,13 +905,12 @@ remove_channel_ctx (struct ChannelCtx *channel_ctx)
888/** 905/**
889 * @brief Get the channel of a peer. If not existing, create. 906 * @brief Get the channel of a peer. If not existing, create.
890 * 907 *
891 * @param peer the peer id 908 * @param peer_ctx Context of the peer of which to get the channel
892 * @return the #GNUNET_CADET_Channel used to send data to @a peer 909 * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
893 */ 910 */
894struct GNUNET_CADET_Channel * 911struct GNUNET_CADET_Channel *
895get_channel (const struct GNUNET_PeerIdentity *peer) 912get_channel (struct PeerContext *peer_ctx)
896{ 913{
897 struct PeerContext *peer_ctx;
898 struct GNUNET_PeerIdentity *ctx_peer; 914 struct GNUNET_PeerIdentity *ctx_peer;
899 /* There exists a copy-paste-clone in run() */ 915 /* There exists a copy-paste-clone in run() */
900 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 916 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
@@ -918,20 +934,19 @@ get_channel (const struct GNUNET_PeerIdentity *peer)
918 }; 934 };
919 935
920 936
921 peer_ctx = get_peer_ctx (peer);
922 if (NULL == peer_ctx->send_channel_ctx) 937 if (NULL == peer_ctx->send_channel_ctx)
923 { 938 {
924 LOG (GNUNET_ERROR_TYPE_DEBUG, 939 LOG (GNUNET_ERROR_TYPE_DEBUG,
925 "Trying to establish channel to peer %s\n", 940 "Trying to establish channel to peer %s\n",
926 GNUNET_i2s (peer)); 941 GNUNET_i2s (&peer_ctx->peer_id));
927 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity); 942 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
928 *ctx_peer = *peer; 943 *ctx_peer = peer_ctx->peer_id;
929 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx); 944 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
930 peer_ctx->send_channel_ctx->channel = 945 peer_ctx->send_channel_ctx->channel =
931 GNUNET_CADET_channel_create (mss->cadet_handle, 946 GNUNET_CADET_channel_create (peer_ctx->ss->cadet_handle,
932 peer_ctx->send_channel_ctx, /* context */ 947 peer_ctx->send_channel_ctx, /* context */
933 peer, 948 &peer_ctx->peer_id,
934 &mss->port, 949 &peer_ctx->ss->port,
935 GNUNET_CADET_OPTION_RELIABLE, 950 GNUNET_CADET_OPTION_RELIABLE,
936 NULL, /* WindowSize handler */ 951 NULL, /* WindowSize handler */
937 &cleanup_destroyed_channel, /* Disconnect handler */ 952 &cleanup_destroyed_channel, /* Disconnect handler */
@@ -949,19 +964,15 @@ get_channel (const struct GNUNET_PeerIdentity *peer)
949 * If we already have a message queue open to this client, 964 * If we already have a message queue open to this client,
950 * simply return it, otherways create one. 965 * simply return it, otherways create one.
951 * 966 *
952 * @param peer the peer to get the mq to 967 * @param peer_ctx Context of the peer of whicht to get the mq
953 * @return the #GNUNET_MQ_Handle 968 * @return the #GNUNET_MQ_Handle
954 */ 969 */
955static struct GNUNET_MQ_Handle * 970static struct GNUNET_MQ_Handle *
956get_mq (const struct GNUNET_PeerIdentity *peer) 971get_mq (struct PeerContext *peer_ctx)
957{ 972{
958 struct PeerContext *peer_ctx;
959
960 peer_ctx = get_peer_ctx (peer);
961
962 if (NULL == peer_ctx->mq) 973 if (NULL == peer_ctx->mq)
963 { 974 {
964 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer)); 975 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
965 } 976 }
966 return peer_ctx->mq; 977 return peer_ctx->mq;
967} 978}
@@ -969,20 +980,18 @@ get_mq (const struct GNUNET_PeerIdentity *peer)
969/** 980/**
970 * @brief Add an envelope to a message passed to mq to list of pending messages 981 * @brief Add an envelope to a message passed to mq to list of pending messages
971 * 982 *
972 * @param peer peer the message was sent to 983 * @param peer_ctx Context of the peer for which to insert the envelope
973 * @param ev envelope to the message 984 * @param ev envelope to the message
974 * @param type type of the message to be sent 985 * @param type type of the message to be sent
975 * @return pointer to pending message 986 * @return pointer to pending message
976 */ 987 */
977static struct PendingMessage * 988static struct PendingMessage *
978insert_pending_message (const struct GNUNET_PeerIdentity *peer, 989insert_pending_message (struct PeerContext *peer_ctx,
979 struct GNUNET_MQ_Envelope *ev, 990 struct GNUNET_MQ_Envelope *ev,
980 const char *type) 991 const char *type)
981{ 992{
982 struct PendingMessage *pending_msg; 993 struct PendingMessage *pending_msg;
983 struct PeerContext *peer_ctx;
984 994
985 peer_ctx = get_peer_ctx (peer);
986 pending_msg = GNUNET_new (struct PendingMessage); 995 pending_msg = GNUNET_new (struct PendingMessage);
987 pending_msg->ev = ev; 996 pending_msg->ev = ev;
988 pending_msg->peer_ctx = peer_ctx; 997 pending_msg->peer_ctx = peer_ctx;
@@ -1039,6 +1048,7 @@ mq_online_check_successful (void *cls)
1039 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES); 1048 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1040 peer_ctx->online_check_pending = NULL; 1049 peer_ctx->online_check_pending = NULL;
1041 set_peer_online (peer_ctx); 1050 set_peer_online (peer_ctx);
1051 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->ss->valid_peers);
1042 } 1052 }
1043} 1053}
1044 1054
@@ -1059,8 +1069,8 @@ check_peer_online (struct PeerContext *peer_ctx)
1059 1069
1060 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE); 1070 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1061 peer_ctx->online_check_pending = 1071 peer_ctx->online_check_pending =
1062 insert_pending_message (&peer_ctx->peer_id, ev, "Check online"); 1072 insert_pending_message (peer_ctx, ev, "Check online");
1063 mq = get_mq (&peer_ctx->peer_id); 1073 mq = get_mq (peer_ctx);
1064 GNUNET_MQ_notify_sent (ev, 1074 GNUNET_MQ_notify_sent (ev,
1065 mq_online_check_successful, 1075 mq_online_check_successful,
1066 peer_ctx); 1076 peer_ctx);
@@ -1078,20 +1088,18 @@ check_peer_online (struct PeerContext *peer_ctx)
1078 * The array with pending operations will probably never grow really big, so 1088 * The array with pending operations will probably never grow really big, so
1079 * iterating over it should be ok. 1089 * iterating over it should be ok.
1080 * 1090 *
1081 * @param peer the peer to check 1091 * @param peer_ctx Context of the peer to check for the operation
1082 * @param peer_op the operation (#PeerOp) on the peer 1092 * @param peer_op the operation (#PeerOp) on the peer
1083 * 1093 *
1084 * @return #GNUNET_YES if this operation is scheduled on that peer 1094 * @return #GNUNET_YES if this operation is scheduled on that peer
1085 * #GNUNET_NO otherwise 1095 * #GNUNET_NO otherwise
1086 */ 1096 */
1087static int 1097static int
1088check_operation_scheduled (const struct GNUNET_PeerIdentity *peer, 1098check_operation_scheduled (const struct PeerContext *peer_ctx,
1089 const PeerOp peer_op) 1099 const PeerOp peer_op)
1090{ 1100{
1091 const struct PeerContext *peer_ctx;
1092 unsigned int i; 1101 unsigned int i;
1093 1102
1094 peer_ctx = get_peer_ctx (peer);
1095 for (i = 0; i < peer_ctx->num_pending_ops; i++) 1103 for (i = 0; i < peer_ctx->num_pending_ops; i++)
1096 if (peer_op == peer_ctx->pending_ops[i].op) 1104 if (peer_op == peer_ctx->pending_ops[i].op)
1097 return GNUNET_YES; 1105 return GNUNET_YES;
@@ -1165,7 +1173,13 @@ schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1165/** 1173/**
1166 * @brief Remove peer 1174 * @brief Remove peer
1167 * 1175 *
1168 * @param peer the peer to clean 1176 * - Empties the list with pending operations
1177 * - Empties the list with pending messages
1178 * - Cancels potentially existing online check
1179 * - Schedules closing of send and recv channels
1180 * - Removes peer from peer map
1181 *
1182 * @param peer_ctx Context of the peer to be destroyed
1169 * @return #GNUNET_YES if peer was removed 1183 * @return #GNUNET_YES if peer was removed
1170 * #GNUNET_NO otherwise 1184 * #GNUNET_NO otherwise
1171 */ 1185 */
@@ -1173,9 +1187,9 @@ static int
1173destroy_peer (struct PeerContext *peer_ctx) 1187destroy_peer (struct PeerContext *peer_ctx)
1174{ 1188{
1175 GNUNET_assert (NULL != peer_ctx); 1189 GNUNET_assert (NULL != peer_ctx);
1176 GNUNET_assert (NULL != mss->peer_map); 1190 GNUNET_assert (NULL != peer_ctx->ss->peer_map);
1177 if (GNUNET_NO == 1191 if (GNUNET_NO ==
1178 GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, 1192 GNUNET_CONTAINER_multipeermap_contains (peer_ctx->ss->peer_map,
1179 &peer_ctx->peer_id)) 1193 &peer_ctx->peer_id))
1180 { 1194 {
1181 return GNUNET_NO; 1195 return GNUNET_NO;
@@ -1245,15 +1259,15 @@ destroy_peer (struct PeerContext *peer_ctx)
1245 } 1259 }
1246 1260
1247 if (GNUNET_YES != 1261 if (GNUNET_YES !=
1248 GNUNET_CONTAINER_multipeermap_remove_all (mss->peer_map, 1262 GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->ss->peer_map,
1249 &peer_ctx->peer_id)) 1263 &peer_ctx->peer_id))
1250 { 1264 {
1251 LOG (GNUNET_ERROR_TYPE_WARNING, 1265 LOG (GNUNET_ERROR_TYPE_WARNING,
1252 "removing peer from mss->peer_map failed\n"); 1266 "removing peer from peer_ctx->ss->peer_map failed\n");
1253 } 1267 }
1254 GNUNET_STATISTICS_set (stats, 1268 GNUNET_STATISTICS_set (stats,
1255 "# known peers", 1269 "# known peers",
1256 GNUNET_CONTAINER_multipeermap_size (mss->peer_map), 1270 GNUNET_CONTAINER_multipeermap_size (peer_ctx->ss->peer_map),
1257 GNUNET_NO); 1271 GNUNET_NO);
1258 GNUNET_free (peer_ctx); 1272 GNUNET_free (peer_ctx);
1259 return GNUNET_YES; 1273 return GNUNET_YES;
@@ -1274,9 +1288,10 @@ peermap_clear_iterator (void *cls,
1274 const struct GNUNET_PeerIdentity *key, 1288 const struct GNUNET_PeerIdentity *key,
1275 void *value) 1289 void *value)
1276{ 1290{
1277 (void) cls; 1291 struct SubSampler *ss = cls;
1278 (void) value; 1292 (void) value;
1279 destroy_peer (get_peer_ctx (key)); 1293
1294 destroy_peer (get_peer_ctx (ss->peer_map, key));
1280 return GNUNET_YES; 1295 return GNUNET_YES;
1281} 1296}
1282 1297
@@ -1350,35 +1365,37 @@ store_peer_presistently_iterator (void *cls,
1350 1365
1351/** 1366/**
1352 * @brief Store the peers currently in #valid_peers to disk. 1367 * @brief Store the peers currently in #valid_peers to disk.
1368 *
1369 * @param ss SubSampler for which to store the valid peers
1353 */ 1370 */
1354static void 1371static void
1355store_valid_peers () 1372store_valid_peers (const struct SubSampler *ss)
1356{ 1373{
1357 struct GNUNET_DISK_FileHandle *fh; 1374 struct GNUNET_DISK_FileHandle *fh;
1358 uint32_t number_written_peers; 1375 uint32_t number_written_peers;
1359 int ret; 1376 int ret;
1360 1377
1361 if (0 == strncmp ("DISABLE", mss->filename_valid_peers, 7)) 1378 if (0 == strncmp ("DISABLE", ss->filename_valid_peers, 7))
1362 { 1379 {
1363 return; 1380 return;
1364 } 1381 }
1365 1382
1366 ret = GNUNET_DISK_directory_create_for_file (mss->filename_valid_peers); 1383 ret = GNUNET_DISK_directory_create_for_file (ss->filename_valid_peers);
1367 if (GNUNET_SYSERR == ret) 1384 if (GNUNET_SYSERR == ret)
1368 { 1385 {
1369 LOG (GNUNET_ERROR_TYPE_WARNING, 1386 LOG (GNUNET_ERROR_TYPE_WARNING,
1370 "Not able to create directory for file `%s'\n", 1387 "Not able to create directory for file `%s'\n",
1371 mss->filename_valid_peers); 1388 ss->filename_valid_peers);
1372 GNUNET_break (0); 1389 GNUNET_break (0);
1373 } 1390 }
1374 else if (GNUNET_NO == ret) 1391 else if (GNUNET_NO == ret)
1375 { 1392 {
1376 LOG (GNUNET_ERROR_TYPE_WARNING, 1393 LOG (GNUNET_ERROR_TYPE_WARNING,
1377 "Directory for file `%s' exists but is not writable for us\n", 1394 "Directory for file `%s' exists but is not writable for us\n",
1378 mss->filename_valid_peers); 1395 ss->filename_valid_peers);
1379 GNUNET_break (0); 1396 GNUNET_break (0);
1380 } 1397 }
1381 fh = GNUNET_DISK_file_open (mss->filename_valid_peers, 1398 fh = GNUNET_DISK_file_open (ss->filename_valid_peers,
1382 GNUNET_DISK_OPEN_WRITE | 1399 GNUNET_DISK_OPEN_WRITE |
1383 GNUNET_DISK_OPEN_CREATE, 1400 GNUNET_DISK_OPEN_CREATE,
1384 GNUNET_DISK_PERM_USER_READ | 1401 GNUNET_DISK_PERM_USER_READ |
@@ -1387,19 +1404,19 @@ store_valid_peers ()
1387 { 1404 {
1388 LOG (GNUNET_ERROR_TYPE_WARNING, 1405 LOG (GNUNET_ERROR_TYPE_WARNING,
1389 "Not able to write valid peers to file `%s'\n", 1406 "Not able to write valid peers to file `%s'\n",
1390 mss->filename_valid_peers); 1407 ss->filename_valid_peers);
1391 return; 1408 return;
1392 } 1409 }
1393 LOG (GNUNET_ERROR_TYPE_DEBUG, 1410 LOG (GNUNET_ERROR_TYPE_DEBUG,
1394 "Writing %u valid peers to disk\n", 1411 "Writing %u valid peers to disk\n",
1395 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1412 GNUNET_CONTAINER_multipeermap_size (ss->valid_peers));
1396 number_written_peers = 1413 number_written_peers =
1397 GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 1414 GNUNET_CONTAINER_multipeermap_iterate (ss->valid_peers,
1398 store_peer_presistently_iterator, 1415 store_peer_presistently_iterator,
1399 fh); 1416 fh);
1400 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1417 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1401 GNUNET_assert (number_written_peers == 1418 GNUNET_assert (number_written_peers ==
1402 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1419 GNUNET_CONTAINER_multipeermap_size (ss->valid_peers));
1403} 1420}
1404 1421
1405 1422
@@ -1451,9 +1468,11 @@ s2i_full (const char *string_repr)
1451 1468
1452/** 1469/**
1453 * @brief Restore the peers on disk to #valid_peers. 1470 * @brief Restore the peers on disk to #valid_peers.
1471 *
1472 * @param ss SubSampler for which to restore the valid peers
1454 */ 1473 */
1455static void 1474static void
1456restore_valid_peers () 1475restore_valid_peers (const struct SubSampler *ss)
1457{ 1476{
1458 off_t file_size; 1477 off_t file_size;
1459 uint32_t num_peers; 1478 uint32_t num_peers;
@@ -1464,16 +1483,16 @@ restore_valid_peers ()
1464 char *str_repr; 1483 char *str_repr;
1465 const struct GNUNET_PeerIdentity *peer; 1484 const struct GNUNET_PeerIdentity *peer;
1466 1485
1467 if (0 == strncmp ("DISABLE", mss->filename_valid_peers, 7)) 1486 if (0 == strncmp ("DISABLE", ss->filename_valid_peers, 7))
1468 { 1487 {
1469 return; 1488 return;
1470 } 1489 }
1471 1490
1472 if (GNUNET_OK != GNUNET_DISK_file_test (mss->filename_valid_peers)) 1491 if (GNUNET_OK != GNUNET_DISK_file_test (ss->filename_valid_peers))
1473 { 1492 {
1474 return; 1493 return;
1475 } 1494 }
1476 fh = GNUNET_DISK_file_open (mss->filename_valid_peers, 1495 fh = GNUNET_DISK_file_open (ss->filename_valid_peers,
1477 GNUNET_DISK_OPEN_READ, 1496 GNUNET_DISK_OPEN_READ,
1478 GNUNET_DISK_PERM_NONE); 1497 GNUNET_DISK_PERM_NONE);
1479 GNUNET_assert (NULL != fh); 1498 GNUNET_assert (NULL != fh);
@@ -1485,13 +1504,13 @@ restore_valid_peers ()
1485 LOG (GNUNET_ERROR_TYPE_DEBUG, 1504 LOG (GNUNET_ERROR_TYPE_DEBUG,
1486 "Restoring %" PRIu32 " peers from file `%s'\n", 1505 "Restoring %" PRIu32 " peers from file `%s'\n",
1487 num_peers, 1506 num_peers,
1488 mss->filename_valid_peers); 1507 ss->filename_valid_peers);
1489 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53) 1508 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1490 { 1509 {
1491 str_repr = GNUNET_strndup (iter_buf, 53); 1510 str_repr = GNUNET_strndup (iter_buf, 53);
1492 peer = s2i_full (str_repr); 1511 peer = s2i_full (str_repr);
1493 GNUNET_free (str_repr); 1512 GNUNET_free (str_repr);
1494 add_valid_peer (peer); 1513 add_valid_peer (peer, ss->valid_peers);
1495 LOG (GNUNET_ERROR_TYPE_DEBUG, 1514 LOG (GNUNET_ERROR_TYPE_DEBUG,
1496 "Restored valid peer %s from disk\n", 1515 "Restored valid peer %s from disk\n",
1497 GNUNET_i2s_full (peer)); 1516 GNUNET_i2s_full (peer));
@@ -1499,10 +1518,10 @@ restore_valid_peers ()
1499 iter_buf = NULL; 1518 iter_buf = NULL;
1500 GNUNET_free (buf); 1519 GNUNET_free (buf);
1501 LOG (GNUNET_ERROR_TYPE_DEBUG, 1520 LOG (GNUNET_ERROR_TYPE_DEBUG,
1502 "num_peers: %" PRIu32 ", _size (mss->valid_peers): %u\n", 1521 "num_peers: %" PRIu32 ", _size (ss->valid_peers): %u\n",
1503 num_peers, 1522 num_peers,
1504 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1523 GNUNET_CONTAINER_multipeermap_size (ss->valid_peers));
1505 if (num_peers != GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)) 1524 if (num_peers != GNUNET_CONTAINER_multipeermap_size (ss->valid_peers))
1506 { 1525 {
1507 LOG (GNUNET_ERROR_TYPE_WARNING, 1526 LOG (GNUNET_ERROR_TYPE_WARNING,
1508 "Number of restored peers does not match file size. Have probably duplicates.\n"); 1527 "Number of restored peers does not match file size. Have probably duplicates.\n");
@@ -1510,38 +1529,40 @@ restore_valid_peers ()
1510 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1529 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1511 LOG (GNUNET_ERROR_TYPE_DEBUG, 1530 LOG (GNUNET_ERROR_TYPE_DEBUG,
1512 "Restored %u valid peers from disk\n", 1531 "Restored %u valid peers from disk\n",
1513 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1532 GNUNET_CONTAINER_multipeermap_size (ss->valid_peers));
1514} 1533}
1515 1534
1516 1535
1517/** 1536/**
1518 * @brief Delete storage of peers that was created with #initialise_peers () 1537 * @brief Delete storage of peers that was created with #initialise_peers ()
1538 *
1539 * @param ss SubSampler for which the storage is deleted
1519 */ 1540 */
1520static void 1541static void
1521peers_terminate () 1542peers_terminate (struct SubSampler *ss)
1522{ 1543{
1523 if (GNUNET_SYSERR == 1544 if (GNUNET_SYSERR ==
1524 GNUNET_CONTAINER_multipeermap_iterate (mss->peer_map, 1545 GNUNET_CONTAINER_multipeermap_iterate (ss->peer_map,
1525 &peermap_clear_iterator, 1546 &peermap_clear_iterator,
1526 NULL)) 1547 ss))
1527 { 1548 {
1528 LOG (GNUNET_ERROR_TYPE_WARNING, 1549 LOG (GNUNET_ERROR_TYPE_WARNING,
1529 "Iteration destroying peers was aborted.\n"); 1550 "Iteration destroying peers was aborted.\n");
1530 } 1551 }
1531 GNUNET_CONTAINER_multipeermap_destroy (mss->peer_map); 1552 GNUNET_CONTAINER_multipeermap_destroy (ss->peer_map);
1532 mss->peer_map = NULL; 1553 ss->peer_map = NULL;
1533 store_valid_peers (); 1554 store_valid_peers (ss);
1534 GNUNET_free (mss->filename_valid_peers); 1555 GNUNET_free (ss->filename_valid_peers);
1535 mss->filename_valid_peers = NULL; 1556 ss->filename_valid_peers = NULL;
1536 GNUNET_CONTAINER_multipeermap_destroy (mss->valid_peers); 1557 GNUNET_CONTAINER_multipeermap_destroy (ss->valid_peers);
1537 mss->valid_peers = NULL; 1558 ss->valid_peers = NULL;
1538} 1559}
1539 1560
1540 1561
1541/** 1562/**
1542 * Iterator over #valid_peers hash map entries. 1563 * Iterator over #valid_peers hash map entries.
1543 * 1564 *
1544 * @param cls closure - unused 1565 * @param cls Closure that contains iterator function and closure
1545 * @param peer current peer id 1566 * @param peer current peer id
1546 * @param value value in the hash map - unused 1567 * @param value value in the hash map - unused
1547 * @return #GNUNET_YES if we should continue to 1568 * @return #GNUNET_YES if we should continue to
@@ -1556,21 +1577,22 @@ valid_peer_iterator (void *cls,
1556 struct PeersIteratorCls *it_cls = cls; 1577 struct PeersIteratorCls *it_cls = cls;
1557 (void) value; 1578 (void) value;
1558 1579
1559 return it_cls->iterator (it_cls->cls, 1580 return it_cls->iterator (it_cls->cls, peer);
1560 peer);
1561} 1581}
1562 1582
1563 1583
1564/** 1584/**
1565 * @brief Get all currently known, valid peer ids. 1585 * @brief Get all currently known, valid peer ids.
1566 * 1586 *
1567 * @param it function to call on each peer id 1587 * @param valid_peers Peer map containing the valid peers in question
1568 * @param it_cls extra argument to @a it 1588 * @param iterator function to call on each peer id
1589 * @param it_cls extra argument to @a iterator
1569 * @return the number of key value pairs processed, 1590 * @return the number of key value pairs processed,
1570 * #GNUNET_SYSERR if it aborted iteration 1591 * #GNUNET_SYSERR if it aborted iteration
1571 */ 1592 */
1572static int 1593static int
1573get_valid_peers (PeersIterator iterator, 1594get_valid_peers (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1595 PeersIterator iterator,
1574 void *it_cls) 1596 void *it_cls)
1575{ 1597{
1576 struct PeersIteratorCls *cls; 1598 struct PeersIteratorCls *cls;
@@ -1579,7 +1601,7 @@ get_valid_peers (PeersIterator iterator,
1579 cls = GNUNET_new (struct PeersIteratorCls); 1601 cls = GNUNET_new (struct PeersIteratorCls);
1580 cls->iterator = iterator; 1602 cls->iterator = iterator;
1581 cls->cls = it_cls; 1603 cls->cls = it_cls;
1582 ret = GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 1604 ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1583 valid_peer_iterator, 1605 valid_peer_iterator,
1584 cls); 1606 cls);
1585 GNUNET_free (cls); 1607 GNUNET_free (cls);
@@ -1593,19 +1615,21 @@ get_valid_peers (PeersIterator iterator,
1593 * This function is called on new peer_ids from 'external' sources 1615 * This function is called on new peer_ids from 'external' sources
1594 * (client seed, cadet get_peers(), ...) 1616 * (client seed, cadet get_peers(), ...)
1595 * 1617 *
1618 * @param ss SubSampler with the peer map that the @a peer will be added to
1596 * @param peer the new #GNUNET_PeerIdentity 1619 * @param peer the new #GNUNET_PeerIdentity
1597 * 1620 *
1598 * @return #GNUNET_YES if peer was inserted 1621 * @return #GNUNET_YES if peer was inserted
1599 * #GNUNET_NO otherwise 1622 * #GNUNET_NO otherwise
1600 */ 1623 */
1601static int 1624static int
1602insert_peer (const struct GNUNET_PeerIdentity *peer) 1625insert_peer (struct SubSampler *ss,
1626 const struct GNUNET_PeerIdentity *peer)
1603{ 1627{
1604 if (GNUNET_YES == check_peer_known (peer)) 1628 if (GNUNET_YES == check_peer_known (ss->peer_map, peer))
1605 { 1629 {
1606 return GNUNET_NO; /* We already know this peer - nothing to do */ 1630 return GNUNET_NO; /* We already know this peer - nothing to do */
1607 } 1631 }
1608 (void) create_peer_ctx (peer); 1632 (void) create_peer_ctx (ss, peer);
1609 return GNUNET_YES; 1633 return GNUNET_YES;
1610} 1634}
1611 1635
@@ -1613,6 +1637,7 @@ insert_peer (const struct GNUNET_PeerIdentity *peer)
1613/** 1637/**
1614 * @brief Check whether flags on a peer are set. 1638 * @brief Check whether flags on a peer are set.
1615 * 1639 *
1640 * @param peer_map Peer map that is expected to contain the @a peer
1616 * @param peer the peer to check the flag of 1641 * @param peer the peer to check the flag of
1617 * @param flags the flags to check 1642 * @param flags the flags to check
1618 * 1643 *
@@ -1621,16 +1646,17 @@ insert_peer (const struct GNUNET_PeerIdentity *peer)
1621 * #GNUNET_NO otherwise 1646 * #GNUNET_NO otherwise
1622 */ 1647 */
1623static int 1648static int
1624check_peer_flag (const struct GNUNET_PeerIdentity *peer, 1649check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1650 const struct GNUNET_PeerIdentity *peer,
1625 enum Peers_PeerFlags flags) 1651 enum Peers_PeerFlags flags)
1626{ 1652{
1627 struct PeerContext *peer_ctx; 1653 struct PeerContext *peer_ctx;
1628 1654
1629 if (GNUNET_NO == check_peer_known (peer)) 1655 if (GNUNET_NO == check_peer_known (peer_map, peer))
1630 { 1656 {
1631 return GNUNET_SYSERR; 1657 return GNUNET_SYSERR;
1632 } 1658 }
1633 peer_ctx = get_peer_ctx (peer); 1659 peer_ctx = get_peer_ctx (peer_map, peer);
1634 return check_peer_flag_set (peer_ctx, flags); 1660 return check_peer_flag_set (peer_ctx, flags);
1635} 1661}
1636 1662
@@ -1639,18 +1665,20 @@ check_peer_flag (const struct GNUNET_PeerIdentity *peer,
1639 * 1665 *
1640 * If not known yet, insert into known peers 1666 * If not known yet, insert into known peers
1641 * 1667 *
1668 * @param ss SubSampler which would contain the @a peer
1642 * @param peer the peer whose online is to be checked 1669 * @param peer the peer whose online is to be checked
1643 * @return #GNUNET_YES if the check was issued 1670 * @return #GNUNET_YES if the check was issued
1644 * #GNUNET_NO otherwise 1671 * #GNUNET_NO otherwise
1645 */ 1672 */
1646static int 1673static int
1647issue_peer_online_check (const struct GNUNET_PeerIdentity *peer) 1674issue_peer_online_check (struct SubSampler *ss,
1675 const struct GNUNET_PeerIdentity *peer)
1648{ 1676{
1649 struct PeerContext *peer_ctx; 1677 struct PeerContext *peer_ctx;
1650 1678
1651 (void) insert_peer (peer); 1679 (void) insert_peer (ss, peer); // TODO even needed?
1652 peer_ctx = get_peer_ctx (peer); 1680 peer_ctx = get_peer_ctx (ss->peer_map, peer);
1653 if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) && 1681 if ( (GNUNET_NO == check_peer_flag (ss->peer_map, peer, Peers_ONLINE)) &&
1654 (NULL == peer_ctx->online_check_pending) ) 1682 (NULL == peer_ctx->online_check_pending) )
1655 { 1683 {
1656 check_peer_online (peer_ctx); 1684 check_peer_online (peer_ctx);
@@ -1668,22 +1696,20 @@ issue_peer_online_check (const struct GNUNET_PeerIdentity *peer)
1668 * - there are pending messages 1696 * - there are pending messages
1669 * - there is no pending pull reply 1697 * - there is no pending pull reply
1670 * 1698 *
1671 * @param peer the peer in question 1699 * @param peer_ctx Context of the peer in question
1672 * @return #GNUNET_YES if peer is removable 1700 * @return #GNUNET_YES if peer is removable
1673 * #GNUNET_NO if peer is NOT removable 1701 * #GNUNET_NO if peer is NOT removable
1674 * #GNUNET_SYSERR if peer is not known 1702 * #GNUNET_SYSERR if peer is not known
1675 */ 1703 */
1676static int 1704static int
1677check_removable (const struct GNUNET_PeerIdentity *peer) 1705check_removable (const struct PeerContext *peer_ctx)
1678{ 1706{
1679 struct PeerContext *peer_ctx; 1707 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->ss->peer_map,
1680 1708 &peer_ctx->peer_id))
1681 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer))
1682 { 1709 {
1683 return GNUNET_SYSERR; 1710 return GNUNET_SYSERR;
1684 } 1711 }
1685 1712
1686 peer_ctx = get_peer_ctx (peer);
1687 if ( (NULL != peer_ctx->recv_channel_ctx) || 1713 if ( (NULL != peer_ctx->recv_channel_ctx) ||
1688 (NULL != peer_ctx->pending_messages_head) || 1714 (NULL != peer_ctx->pending_messages_head) ||
1689 (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) ) 1715 (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
@@ -1699,15 +1725,17 @@ check_removable (const struct GNUNET_PeerIdentity *peer)
1699 * 1725 *
1700 * A valid peer is a peer that we know exists eg. we were connected to once. 1726 * A valid peer is a peer that we know exists eg. we were connected to once.
1701 * 1727 *
1728 * @param valid_peers Peer map that would contain the @a peer
1702 * @param peer peer in question 1729 * @param peer peer in question
1703 * 1730 *
1704 * @return #GNUNET_YES if peer is valid 1731 * @return #GNUNET_YES if peer is valid
1705 * #GNUNET_NO if peer is not valid 1732 * #GNUNET_NO if peer is not valid
1706 */ 1733 */
1707static int 1734static int
1708check_peer_valid (const struct GNUNET_PeerIdentity *peer) 1735check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1736 const struct GNUNET_PeerIdentity *peer)
1709{ 1737{
1710 return GNUNET_CONTAINER_multipeermap_contains (mss->valid_peers, peer); 1738 return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1711} 1739}
1712 1740
1713 1741
@@ -1716,13 +1744,14 @@ check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1716 * 1744 *
1717 * This establishes a sending channel 1745 * This establishes a sending channel
1718 * 1746 *
1719 * @param peer the peer to establish channel to 1747 * @param peer_ctx Context of the target peer
1720 */ 1748 */
1721static void 1749static void
1722indicate_sending_intention (const struct GNUNET_PeerIdentity *peer) 1750indicate_sending_intention (struct PeerContext *peer_ctx)
1723{ 1751{
1724 GNUNET_assert (GNUNET_YES == check_peer_known (peer)); 1752 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->ss->peer_map,
1725 (void) get_channel (peer); 1753 &peer_ctx->peer_id));
1754 (void) get_channel (peer_ctx);
1726} 1755}
1727 1756
1728 1757
@@ -1730,17 +1759,14 @@ indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1730 * @brief Check whether other peer has the intention to send/opened channel 1759 * @brief Check whether other peer has the intention to send/opened channel
1731 * towars us 1760 * towars us
1732 * 1761 *
1733 * @param peer the peer in question 1762 * @param peer_ctx Context of the peer in question
1734 * 1763 *
1735 * @return #GNUNET_YES if peer has the intention to send 1764 * @return #GNUNET_YES if peer has the intention to send
1736 * #GNUNET_NO otherwise 1765 * #GNUNET_NO otherwise
1737 */ 1766 */
1738static int 1767static int
1739check_peer_send_intention (const struct GNUNET_PeerIdentity *peer) 1768check_peer_send_intention (const struct PeerContext *peer_ctx)
1740{ 1769{
1741 const struct PeerContext *peer_ctx;
1742
1743 peer_ctx = get_peer_ctx (peer);
1744 if (NULL != peer_ctx->recv_channel_ctx) 1770 if (NULL != peer_ctx->recv_channel_ctx)
1745 { 1771 {
1746 return GNUNET_YES; 1772 return GNUNET_YES;
@@ -1752,7 +1778,7 @@ check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1752/** 1778/**
1753 * Handle the channel a peer opens to us. 1779 * Handle the channel a peer opens to us.
1754 * 1780 *
1755 * @param cls The closure 1781 * @param cls The closure - SubSampler
1756 * @param channel The channel the peer wants to establish 1782 * @param channel The channel the peer wants to establish
1757 * @param initiator The peer's peer ID 1783 * @param initiator The peer's peer ID
1758 * 1784 *
@@ -1767,21 +1793,23 @@ handle_inbound_channel (void *cls,
1767 struct PeerContext *peer_ctx; 1793 struct PeerContext *peer_ctx;
1768 struct GNUNET_PeerIdentity *ctx_peer; 1794 struct GNUNET_PeerIdentity *ctx_peer;
1769 struct ChannelCtx *channel_ctx; 1795 struct ChannelCtx *channel_ctx;
1770 (void) cls; 1796 struct SubSampler *ss = cls;
1771 1797
1772 LOG (GNUNET_ERROR_TYPE_DEBUG, 1798 LOG (GNUNET_ERROR_TYPE_DEBUG,
1773 "New channel was established to us (Peer %s).\n", 1799 "New channel was established to us (Peer %s).\n",
1774 GNUNET_i2s (initiator)); 1800 GNUNET_i2s (initiator));
1775 GNUNET_assert (NULL != channel); /* according to cadet API */ 1801 GNUNET_assert (NULL != channel); /* according to cadet API */
1776 /* Make sure we 'know' about this peer */ 1802 /* Make sure we 'know' about this peer */
1777 peer_ctx = create_or_get_peer_ctx (initiator); 1803 peer_ctx = create_or_get_peer_ctx (ss, initiator);
1778 set_peer_online (peer_ctx); 1804 set_peer_online (peer_ctx);
1805 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->ss->valid_peers);
1779 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity); 1806 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1780 *ctx_peer = *initiator; 1807 *ctx_peer = *initiator;
1781 channel_ctx = add_channel_ctx (peer_ctx); 1808 channel_ctx = add_channel_ctx (peer_ctx);
1782 channel_ctx->channel = channel; 1809 channel_ctx->channel = channel;
1783 /* We only accept one incoming channel per peer */ 1810 /* We only accept one incoming channel per peer */
1784 if (GNUNET_YES == check_peer_send_intention (initiator)) 1811 if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (ss->peer_map,
1812 initiator)))
1785 { 1813 {
1786 LOG (GNUNET_ERROR_TYPE_WARNING, 1814 LOG (GNUNET_ERROR_TYPE_WARNING,
1787 "Already got one receive channel. Destroying old one.\n"); 1815 "Already got one receive channel. Destroying old one.\n");
@@ -1799,21 +1827,19 @@ handle_inbound_channel (void *cls,
1799/** 1827/**
1800 * @brief Check whether a sending channel towards the given peer exists 1828 * @brief Check whether a sending channel towards the given peer exists
1801 * 1829 *
1802 * @param peer the peer to check for 1830 * @param peer_ctx Context of the peer in question
1803 * 1831 *
1804 * @return #GNUNET_YES if a sending channel towards that peer exists 1832 * @return #GNUNET_YES if a sending channel towards that peer exists
1805 * #GNUNET_NO otherwise 1833 * #GNUNET_NO otherwise
1806 */ 1834 */
1807static int 1835static int
1808check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer) 1836check_sending_channel_exists (const struct PeerContext *peer_ctx)
1809{ 1837{
1810 struct PeerContext *peer_ctx; 1838 if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map,
1811 1839 &peer_ctx->peer_id))
1812 if (GNUNET_NO == check_peer_known (peer))
1813 { /* If no such peer exists, there is no channel */ 1840 { /* If no such peer exists, there is no channel */
1814 return GNUNET_NO; 1841 return GNUNET_NO;
1815 } 1842 }
1816 peer_ctx = get_peer_ctx (peer);
1817 if (NULL == peer_ctx->send_channel_ctx) 1843 if (NULL == peer_ctx->send_channel_ctx)
1818 { 1844 {
1819 return GNUNET_NO; 1845 return GNUNET_NO;
@@ -1826,24 +1852,22 @@ check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1826 * @brief Destroy the send channel of a peer e.g. stop indicating a sending 1852 * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1827 * intention to another peer 1853 * intention to another peer
1828 * 1854 *
1829 * @peer the peer identity of the peer whose sending channel to destroy 1855 * @param peer_ctx Context to the peer
1830 * @return #GNUNET_YES if channel was destroyed 1856 * @return #GNUNET_YES if channel was destroyed
1831 * #GNUNET_NO otherwise 1857 * #GNUNET_NO otherwise
1832 */ 1858 */
1833static int 1859static int
1834destroy_sending_channel (const struct GNUNET_PeerIdentity *peer) 1860destroy_sending_channel (struct PeerContext *peer_ctx)
1835{ 1861{
1836 struct PeerContext *peer_ctx; 1862 if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map,
1837 1863 &peer_ctx->peer_id))
1838 if (GNUNET_NO == check_peer_known (peer))
1839 { 1864 {
1840 return GNUNET_NO; 1865 return GNUNET_NO;
1841 } 1866 }
1842 peer_ctx = get_peer_ctx (peer);
1843 if (NULL != peer_ctx->send_channel_ctx) 1867 if (NULL != peer_ctx->send_channel_ctx)
1844 { 1868 {
1845 destroy_channel (peer_ctx->send_channel_ctx); 1869 destroy_channel (peer_ctx->send_channel_ctx);
1846 (void) check_connected (peer); 1870 (void) check_connected (peer_ctx);
1847 return GNUNET_YES; 1871 return GNUNET_YES;
1848 } 1872 }
1849 return GNUNET_NO; 1873 return GNUNET_NO;
@@ -1855,12 +1879,12 @@ destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1855 * Keeps track about pending messages so they can be properly removed when the 1879 * Keeps track about pending messages so they can be properly removed when the
1856 * peer is destroyed. 1880 * peer is destroyed.
1857 * 1881 *
1858 * @param peer receeiver of the message 1882 * @param peer_ctx Context of the peer to which the message is to be sent
1859 * @param ev envelope of the message 1883 * @param ev envelope of the message
1860 * @param type type of the message 1884 * @param type type of the message
1861 */ 1885 */
1862static void 1886static void
1863send_message (const struct GNUNET_PeerIdentity *peer, 1887send_message (struct PeerContext *peer_ctx,
1864 struct GNUNET_MQ_Envelope *ev, 1888 struct GNUNET_MQ_Envelope *ev,
1865 const char *type) 1889 const char *type)
1866{ 1890{
@@ -1869,10 +1893,10 @@ send_message (const struct GNUNET_PeerIdentity *peer,
1869 1893
1870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1871 "Sending message to %s of type %s\n", 1895 "Sending message to %s of type %s\n",
1872 GNUNET_i2s (peer), 1896 GNUNET_i2s (&peer_ctx->peer_id),
1873 type); 1897 type);
1874 pending_msg = insert_pending_message (peer, ev, type); 1898 pending_msg = insert_pending_message (peer_ctx, ev, type);
1875 mq = get_mq (peer); 1899 mq = get_mq (peer_ctx);
1876 GNUNET_MQ_notify_sent (ev, 1900 GNUNET_MQ_notify_sent (ev,
1877 mq_notify_sent_cb, 1901 mq_notify_sent_cb,
1878 pending_msg); 1902 pending_msg);
@@ -1884,28 +1908,29 @@ send_message (const struct GNUNET_PeerIdentity *peer,
1884 * 1908 *
1885 * Avoids scheduling an operation twice. 1909 * Avoids scheduling an operation twice.
1886 * 1910 *
1887 * @param peer the peer we want to schedule the operation for once it gets 1911 * @param peer_ctx Context of the peer for which to schedule the operation
1888 * online 1912 * @param peer_op the operation to schedule
1913 * @param cls Closure to @a peer_op
1889 * 1914 *
1890 * @return #GNUNET_YES if the operation was scheduled 1915 * @return #GNUNET_YES if the operation was scheduled
1891 * #GNUNET_NO otherwise 1916 * #GNUNET_NO otherwise
1892 */ 1917 */
1893static int 1918static int
1894schedule_operation (const struct GNUNET_PeerIdentity *peer, 1919schedule_operation (struct PeerContext *peer_ctx,
1895 const PeerOp peer_op) 1920 const PeerOp peer_op,
1921 void *cls)
1896{ 1922{
1897 struct PeerPendingOp pending_op; 1923 struct PeerPendingOp pending_op;
1898 struct PeerContext *peer_ctx;
1899 1924
1900 GNUNET_assert (GNUNET_YES == check_peer_known (peer)); 1925 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->ss->peer_map,
1926 &peer_ctx->peer_id));
1901 1927
1902 //TODO if ONLINE execute immediately 1928 //TODO if ONLINE execute immediately
1903 1929
1904 if (GNUNET_NO == check_operation_scheduled (peer, peer_op)) 1930 if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
1905 { 1931 {
1906 peer_ctx = get_peer_ctx (peer);
1907 pending_op.op = peer_op; 1932 pending_op.op = peer_op;
1908 pending_op.op_cls = NULL; 1933 pending_op.op_cls = cls;
1909 GNUNET_array_append (peer_ctx->pending_ops, 1934 GNUNET_array_append (peer_ctx->pending_ops,
1910 peer_ctx->num_pending_ops, 1935 peer_ctx->num_pending_ops,
1911 pending_op); 1936 pending_op);
@@ -1983,6 +2008,11 @@ struct ClientContext
1983 * The client handle to send the reply to 2008 * The client handle to send the reply to
1984 */ 2009 */
1985 struct GNUNET_SERVICE_Client *client; 2010 struct GNUNET_SERVICE_Client *client;
2011
2012 /**
2013 * The #SubSampler this context belongs to
2014 */
2015 struct SubSampler *ss;
1986}; 2016};
1987 2017
1988/** 2018/**
@@ -2079,27 +2109,36 @@ insert_in_view_op (void *cls,
2079 * 2109 *
2080 * Called once we know a peer is online. 2110 * Called once we know a peer is online.
2081 * 2111 *
2112 * @param ss SubSampler in with the view to insert in
2113 * @param peer the peer to insert
2114 *
2082 * @return GNUNET_OK if peer was actually inserted 2115 * @return GNUNET_OK if peer was actually inserted
2083 * GNUNET_NO if peer was not inserted 2116 * GNUNET_NO if peer was not inserted
2084 */ 2117 */
2085static int 2118static int
2086insert_in_view (const struct GNUNET_PeerIdentity *peer) 2119insert_in_view (struct SubSampler *ss,
2120 const struct GNUNET_PeerIdentity *peer)
2087{ 2121{
2122 struct PeerContext *peer_ctx;
2088 int online; 2123 int online;
2089 int ret; 2124 int ret;
2090 2125
2091 online = check_peer_flag (peer, Peers_ONLINE); 2126 online = check_peer_flag (ss->peer_map, peer, Peers_ONLINE);
2127 peer_ctx = get_peer_ctx (ss->peer_map, peer); // TODO indirection needed?
2092 if ( (GNUNET_NO == online) || 2128 if ( (GNUNET_NO == online) ||
2093 (GNUNET_SYSERR == online) ) /* peer is not even known */ 2129 (GNUNET_SYSERR == online) ) /* peer is not even known */
2094 { 2130 {
2095 (void) issue_peer_online_check (peer); 2131 (void) issue_peer_online_check (ss, peer);
2096 (void) schedule_operation (peer, insert_in_view_op); 2132 (void) schedule_operation (peer_ctx, insert_in_view_op, NULL);
2097 return GNUNET_NO; 2133 return GNUNET_NO;
2098 } 2134 }
2099 /* Open channel towards peer to keep connection open */ 2135 /* Open channel towards peer to keep connection open */
2100 indicate_sending_intention (peer); 2136 indicate_sending_intention (peer_ctx);
2101 ret = View_put (mss->view, peer); 2137 ret = View_put (ss->view, peer);
2102 GNUNET_STATISTICS_set (stats, "view size", View_size(mss->view), GNUNET_NO); 2138 GNUNET_STATISTICS_set (stats,
2139 "view size",
2140 View_size (peer_ctx->ss->view),
2141 GNUNET_NO);
2103 return ret; 2142 return ret;
2104} 2143}
2105 2144
@@ -2111,7 +2150,7 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer)
2111 * @param view_array the peerids of the view as array (can be empty) 2150 * @param view_array the peerids of the view as array (can be empty)
2112 * @param view_size the size of the view array (can be 0) 2151 * @param view_size the size of the view array (can be 0)
2113 */ 2152 */
2114void 2153static void
2115send_view (const struct ClientContext *cli_ctx, 2154send_view (const struct ClientContext *cli_ctx,
2116 const struct GNUNET_PeerIdentity *view_array, 2155 const struct GNUNET_PeerIdentity *view_array,
2117 uint64_t view_size) 2156 uint64_t view_size)
@@ -2121,8 +2160,8 @@ send_view (const struct ClientContext *cli_ctx,
2121 2160
2122 if (NULL == view_array) 2161 if (NULL == view_array)
2123 { 2162 {
2124 view_size = View_size (mss->view); 2163 view_size = View_size (cli_ctx->ss->view);
2125 view_array = View_get_as_array(mss->view); 2164 view_array = View_get_as_array (cli_ctx->ss->view);
2126 } 2165 }
2127 2166
2128 ev = GNUNET_MQ_msg_extra (out_msg, 2167 ev = GNUNET_MQ_msg_extra (out_msg,
@@ -2146,7 +2185,7 @@ send_view (const struct ClientContext *cli_ctx,
2146 * @param view_array the peerids of the view as array (can be empty) 2185 * @param view_array the peerids of the view as array (can be empty)
2147 * @param view_size the size of the view array (can be 0) 2186 * @param view_size the size of the view array (can be 0)
2148 */ 2187 */
2149void 2188static void
2150send_stream_peers (const struct ClientContext *cli_ctx, 2189send_stream_peers (const struct ClientContext *cli_ctx,
2151 uint64_t num_peers, 2190 uint64_t num_peers,
2152 const struct GNUNET_PeerIdentity *peers) 2191 const struct GNUNET_PeerIdentity *peers)
@@ -2170,16 +2209,18 @@ send_stream_peers (const struct ClientContext *cli_ctx,
2170 2209
2171/** 2210/**
2172 * @brief sends updates to clients that are interested 2211 * @brief sends updates to clients that are interested
2212 *
2213 * @param ss Subsampler for which to notify clients
2173 */ 2214 */
2174static void 2215static void
2175clients_notify_view_update (void) 2216clients_notify_view_update (const struct SubSampler *ss)
2176{ 2217{
2177 struct ClientContext *cli_ctx_iter; 2218 struct ClientContext *cli_ctx_iter;
2178 uint64_t num_peers; 2219 uint64_t num_peers;
2179 const struct GNUNET_PeerIdentity *view_array; 2220 const struct GNUNET_PeerIdentity *view_array;
2180 2221
2181 num_peers = View_size (mss->view); 2222 num_peers = View_size (ss->view);
2182 view_array = View_get_as_array(mss->view); 2223 view_array = View_get_as_array(ss->view);
2183 /* check size of view is small enough */ 2224 /* check size of view is small enough */
2184 if (GNUNET_MAX_MESSAGE_SIZE < num_peers) 2225 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2185 { 2226 {
@@ -2214,6 +2255,9 @@ clients_notify_view_update (void)
2214 2255
2215/** 2256/**
2216 * @brief sends updates to clients that are interested 2257 * @brief sends updates to clients that are interested
2258 *
2259 * @param num_peers Number of peers to send
2260 * @param peers the array of peers to send
2217 */ 2261 */
2218static void 2262static void
2219clients_notify_stream_peer (uint64_t num_peers, 2263clients_notify_stream_peer (uint64_t num_peers,
@@ -2237,8 +2281,13 @@ clients_notify_stream_peer (uint64_t num_peers,
2237 } 2281 }
2238} 2282}
2239 2283
2284
2240/** 2285/**
2241 * Put random peer from sampler into the view as history update. 2286 * Put random peer from sampler into the view as history update.
2287 *
2288 * @param ids Array of Peers to insert into view
2289 * @param num_peers Number of peers to insert
2290 * @param cls Closure - The SubSampler for which this is to be done
2242 */ 2291 */
2243static void 2292static void
2244hist_update (const struct GNUNET_PeerIdentity *ids, 2293hist_update (const struct GNUNET_PeerIdentity *ids,
@@ -2246,21 +2295,21 @@ hist_update (const struct GNUNET_PeerIdentity *ids,
2246 void *cls) 2295 void *cls)
2247{ 2296{
2248 unsigned int i; 2297 unsigned int i;
2249 (void) cls; 2298 struct SubSampler *ss = cls;
2250 2299
2251 for (i = 0; i < num_peers; i++) 2300 for (i = 0; i < num_peers; i++)
2252 { 2301 {
2253 int inserted; 2302 int inserted;
2254 inserted = insert_in_view (&ids[i]); 2303 inserted = insert_in_view (ss, &ids[i]);
2255 if (GNUNET_OK == inserted) 2304 if (GNUNET_OK == inserted)
2256 { 2305 {
2257 clients_notify_stream_peer (1, &ids[i]); 2306 clients_notify_stream_peer (1, &ids[i]);
2258 } 2307 }
2259 to_file (mss->file_name_view_log, 2308 to_file (ss->file_name_view_log,
2260 "+%s\t(hist)", 2309 "+%s\t(hist)",
2261 GNUNET_i2s_full (ids)); 2310 GNUNET_i2s_full (ids));
2262 } 2311 }
2263 clients_notify_view_update(); 2312 clients_notify_view_update (ss);
2264} 2313}
2265 2314
2266 2315
@@ -2269,6 +2318,9 @@ hist_update (const struct GNUNET_PeerIdentity *ids,
2269 * 2318 *
2270 * If we do not have enough sampler elements, double current sampler size 2319 * If we do not have enough sampler elements, double current sampler size
2271 * If we have more than enough sampler elements, halv current sampler size 2320 * If we have more than enough sampler elements, halv current sampler size
2321 *
2322 * @param sampler The sampler to resize
2323 * @param new_size New size to which to resize
2272 */ 2324 */
2273static void 2325static void
2274resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size) 2326resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
@@ -2327,12 +2379,12 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2327/** 2379/**
2328 * Send a PULL REPLY to @a peer_id 2380 * Send a PULL REPLY to @a peer_id
2329 * 2381 *
2330 * @param peer_id the peer to send the reply to. 2382 * @param peer_ctx Context of the peer to send the reply to
2331 * @param peer_ids the peers to send to @a peer_id 2383 * @param peer_ids the peers to send to @a peer_id
2332 * @param num_peer_ids the number of peers to send to @a peer_id 2384 * @param num_peer_ids the number of peers to send to @a peer_id
2333 */ 2385 */
2334static void 2386static void
2335send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, 2387send_pull_reply (struct PeerContext *peer_ctx,
2336 const struct GNUNET_PeerIdentity *peer_ids, 2388 const struct GNUNET_PeerIdentity *peer_ids,
2337 unsigned int num_peer_ids) 2389 unsigned int num_peer_ids)
2338{ 2390{
@@ -2358,7 +2410,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2358 2410
2359 LOG (GNUNET_ERROR_TYPE_DEBUG, 2411 LOG (GNUNET_ERROR_TYPE_DEBUG,
2360 "Going to send PULL REPLY with %u peers to %s\n", 2412 "Going to send PULL REPLY with %u peers to %s\n",
2361 send_size, GNUNET_i2s (peer_id)); 2413 send_size, GNUNET_i2s (&peer_ctx->peer_id));
2362 2414
2363 ev = GNUNET_MQ_msg_extra (out_msg, 2415 ev = GNUNET_MQ_msg_extra (out_msg,
2364 send_size * sizeof (struct GNUNET_PeerIdentity), 2416 send_size * sizeof (struct GNUNET_PeerIdentity),
@@ -2367,7 +2419,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2367 GNUNET_memcpy (&out_msg[1], peer_ids, 2419 GNUNET_memcpy (&out_msg[1], peer_ids,
2368 send_size * sizeof (struct GNUNET_PeerIdentity)); 2420 send_size * sizeof (struct GNUNET_PeerIdentity));
2369 2421
2370 send_message (peer_id, ev, "PULL REPLY"); 2422 send_message (peer_ctx, ev, "PULL REPLY");
2371 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO); 2423 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2372 // TODO check with send intention: as send_channel is used/opened we indicate 2424 // TODO check with send intention: as send_channel is used/opened we indicate
2373 // a sending intention without intending it. 2425 // a sending intention without intending it.
@@ -2380,13 +2432,17 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2380 * Insert PeerID in #pull_map 2432 * Insert PeerID in #pull_map
2381 * 2433 *
2382 * Called once we know a peer is online. 2434 * Called once we know a peer is online.
2435 *
2436 * @param cls Closure - SubSampler with the pull map to insert into
2437 * @param peer Peer to insert
2383 */ 2438 */
2384static void 2439static void
2385insert_in_pull_map (void *cls, 2440insert_in_pull_map (void *cls,
2386 const struct GNUNET_PeerIdentity *peer) 2441 const struct GNUNET_PeerIdentity *peer)
2387{ 2442{
2388 (void) cls; 2443 struct SubSampler *ss = cls;
2389 CustomPeerMap_put (mss->pull_map, peer); 2444
2445 CustomPeerMap_put (ss->pull_map, peer);
2390} 2446}
2391 2447
2392 2448
@@ -2395,15 +2451,18 @@ insert_in_pull_map (void *cls,
2395 * 2451 *
2396 * Called once we know a peer is online. 2452 * Called once we know a peer is online.
2397 * Implements #PeerOp 2453 * Implements #PeerOp
2454 *
2455 * @param cls Closure - SubSampler with view to insert peer into
2456 * @param peer the peer to insert
2398 */ 2457 */
2399static void 2458static void
2400insert_in_view_op (void *cls, 2459insert_in_view_op (void *cls,
2401 const struct GNUNET_PeerIdentity *peer) 2460 const struct GNUNET_PeerIdentity *peer)
2402{ 2461{
2403 (void) cls; 2462 struct SubSampler *ss = cls;
2404 int inserted; 2463 int inserted;
2405 2464
2406 inserted = insert_in_view (peer); 2465 inserted = insert_in_view (ss, peer);
2407 if (GNUNET_OK == inserted) 2466 if (GNUNET_OK == inserted)
2408 { 2467 {
2409 clients_notify_stream_peer (1, peer); 2468 clients_notify_stream_peer (1, peer);
@@ -2414,41 +2473,46 @@ insert_in_view_op (void *cls,
2414/** 2473/**
2415 * Update sampler with given PeerID. 2474 * Update sampler with given PeerID.
2416 * Implements #PeerOp 2475 * Implements #PeerOp
2476 *
2477 * @param cls Closure - SubSampler containing the sampler to insert into
2478 * @param peer Peer to insert
2417 */ 2479 */
2418static void 2480static void
2419insert_in_sampler (void *cls, 2481insert_in_sampler (void *cls,
2420 const struct GNUNET_PeerIdentity *peer) 2482 const struct GNUNET_PeerIdentity *peer)
2421{ 2483{
2422 (void) cls; 2484 struct SubSampler *ss = cls;
2485
2423 LOG (GNUNET_ERROR_TYPE_DEBUG, 2486 LOG (GNUNET_ERROR_TYPE_DEBUG,
2424 "Updating samplers with peer %s from insert_in_sampler()\n", 2487 "Updating samplers with peer %s from insert_in_sampler()\n",
2425 GNUNET_i2s (peer)); 2488 GNUNET_i2s (peer));
2426 RPS_sampler_update (mss->sampler, peer); 2489 RPS_sampler_update (ss->sampler, peer);
2427 if (0 < RPS_sampler_count_id (mss->sampler, peer)) 2490 if (0 < RPS_sampler_count_id (ss->sampler, peer))
2428 { 2491 {
2429 /* Make sure we 'know' about this peer */ 2492 /* Make sure we 'know' about this peer */
2430 (void) issue_peer_online_check (peer); 2493 (void) issue_peer_online_check (ss, peer);
2431 /* Establish a channel towards that peer to indicate we are going to send 2494 /* Establish a channel towards that peer to indicate we are going to send
2432 * messages to it */ 2495 * messages to it */
2433 //indicate_sending_intention (peer); 2496 //indicate_sending_intention (peer);
2434 } 2497 }
2435 #ifdef TO_FILE 2498 #ifdef TO_FILE
2436 mss->num_observed_peers++; 2499 ss->num_observed_peers++;
2437 GNUNET_CONTAINER_multipeermap_put 2500 GNUNET_CONTAINER_multipeermap_put
2438 (mss->observed_unique_peers, 2501 (ss->observed_unique_peers,
2439 peer, 2502 peer,
2440 NULL, 2503 NULL,
2441 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 2504 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2442 uint32_t num_observed_unique_peers = 2505 uint32_t num_observed_unique_peers =
2443 GNUNET_CONTAINER_multipeermap_size (mss->observed_unique_peers); 2506 GNUNET_CONTAINER_multipeermap_size (ss->observed_unique_peers);
2444 to_file (mss->file_name_observed_log, 2507 to_file (ss->file_name_observed_log,
2445 "%" PRIu32 " %" PRIu32 " %f\n", 2508 "%" PRIu32 " %" PRIu32 " %f\n",
2446 mss->num_observed_peers, 2509 ss->num_observed_peers,
2447 num_observed_unique_peers, 2510 num_observed_unique_peers,
2448 1.0*num_observed_unique_peers/mss->num_observed_peers) 2511 1.0*num_observed_unique_peers/ss->num_observed_peers)
2449 #endif /* TO_FILE */ 2512 #endif /* TO_FILE */
2450} 2513}
2451 2514
2515
2452/** 2516/**
2453 * @brief This is called on peers from external sources (cadet, peerinfo, ...) 2517 * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2454 * If the peer is not known, online check is issued and it is 2518 * If the peer is not known, online check is issued and it is
@@ -2456,16 +2520,20 @@ insert_in_sampler (void *cls,
2456 * 2520 *
2457 * "External sources" refer to every source except the gossip. 2521 * "External sources" refer to every source except the gossip.
2458 * 2522 *
2459 * @param peer peer to insert 2523 * @param ss SubSampler for which @a peer was received
2524 * @param peer peer to insert/peer received
2460 */ 2525 */
2461static void 2526static void
2462got_peer (const struct GNUNET_PeerIdentity *peer) 2527got_peer (struct SubSampler *ss,
2528 const struct GNUNET_PeerIdentity *peer)
2463{ 2529{
2464 /* If we did not know this peer already, insert it into sampler and view */ 2530 /* If we did not know this peer already, insert it into sampler and view */
2465 if (GNUNET_YES == issue_peer_online_check (peer)) 2531 if (GNUNET_YES == issue_peer_online_check (ss, peer))
2466 { 2532 {
2467 schedule_operation (peer, insert_in_sampler); 2533 schedule_operation (get_peer_ctx (ss->peer_map, peer),
2468 schedule_operation (peer, insert_in_view_op); 2534 &insert_in_sampler, ss);
2535 schedule_operation (get_peer_ctx (ss->peer_map, peer),
2536 &insert_in_view_op, ss);
2469 } 2537 }
2470 GNUNET_STATISTICS_update (stats, 2538 GNUNET_STATISTICS_update (stats,
2471 "# learnd peers", 2539 "# learnd peers",
@@ -2473,28 +2541,36 @@ got_peer (const struct GNUNET_PeerIdentity *peer)
2473 GNUNET_NO); 2541 GNUNET_NO);
2474} 2542}
2475 2543
2544
2476/** 2545/**
2477 * @brief Checks if there is a sending channel and if it is needed 2546 * @brief Checks if there is a sending channel and if it is needed
2478 * 2547 *
2479 * @param peer the peer whose sending channel is checked 2548 * @param peer_ctx Context of the peer to check
2480 * @return GNUNET_YES if sending channel exists and is still needed 2549 * @return GNUNET_YES if sending channel exists and is still needed
2481 * GNUNET_NO otherwise 2550 * GNUNET_NO otherwise
2482 */ 2551 */
2483static int 2552static int
2484check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer) 2553check_sending_channel_needed (const struct PeerContext *peer_ctx)
2485{ 2554{
2486 /* struct GNUNET_CADET_Channel *channel; */ 2555 /* struct GNUNET_CADET_Channel *channel; */
2487 if (GNUNET_NO == check_peer_known (peer)) 2556 if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map,
2557 &peer_ctx->peer_id))
2488 { 2558 {
2489 return GNUNET_NO; 2559 return GNUNET_NO;
2490 } 2560 }
2491 if (GNUNET_YES == check_sending_channel_exists (peer)) 2561 if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2492 { 2562 {
2493 if ( (0 < RPS_sampler_count_id (mss->sampler, peer)) || 2563 if ( (0 < RPS_sampler_count_id (peer_ctx->ss->sampler,
2494 (GNUNET_YES == View_contains_peer (mss->view, peer)) || 2564 &peer_ctx->peer_id)) ||
2495 (GNUNET_YES == CustomPeerMap_contains_peer (mss->push_map, peer)) || 2565 (GNUNET_YES == View_contains_peer (peer_ctx->ss->view,
2496 (GNUNET_YES == CustomPeerMap_contains_peer (mss->pull_map, peer)) || 2566 &peer_ctx->peer_id)) ||
2497 (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING))) 2567 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->ss->push_map,
2568 &peer_ctx->peer_id)) ||
2569 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->ss->pull_map,
2570 &peer_ctx->peer_id)) ||
2571 (GNUNET_YES == check_peer_flag (peer_ctx->ss->peer_map,
2572 &peer_ctx->peer_id,
2573 Peers_PULL_REPLY_PENDING)))
2498 { /* If we want to keep the connection to peer open */ 2574 { /* If we want to keep the connection to peer open */
2499 return GNUNET_YES; 2575 return GNUNET_YES;
2500 } 2576 }
@@ -2503,20 +2579,23 @@ check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2503 return GNUNET_NO; 2579 return GNUNET_NO;
2504} 2580}
2505 2581
2582
2506/** 2583/**
2507 * @brief remove peer from our knowledge, the view, push and pull maps and 2584 * @brief remove peer from our knowledge, the view, push and pull maps and
2508 * samplers. 2585 * samplers.
2509 * 2586 *
2587 * @param ss SubSampler with the data structures the peer is to be removed from
2510 * @param peer the peer to remove 2588 * @param peer the peer to remove
2511 */ 2589 */
2512static void 2590static void
2513remove_peer (const struct GNUNET_PeerIdentity *peer) 2591remove_peer (struct SubSampler *ss,
2592 const struct GNUNET_PeerIdentity *peer)
2514{ 2593{
2515 (void) View_remove_peer (mss->view, peer); 2594 (void) View_remove_peer (ss->view, peer);
2516 CustomPeerMap_remove_peer (mss->pull_map, peer); 2595 CustomPeerMap_remove_peer (ss->pull_map, peer);
2517 CustomPeerMap_remove_peer (mss->push_map, peer); 2596 CustomPeerMap_remove_peer (ss->push_map, peer);
2518 RPS_sampler_reinitialise_by_value (mss->sampler, peer); 2597 RPS_sampler_reinitialise_by_value (ss->sampler, peer);
2519 destroy_peer (get_peer_ctx (peer)); 2598 destroy_peer (get_peer_ctx (ss->peer_map, peer));
2520} 2599}
2521 2600
2522 2601
@@ -2525,35 +2604,39 @@ remove_peer (const struct GNUNET_PeerIdentity *peer)
2525 * 2604 *
2526 * If the sending channel is no longer needed it is destroyed. 2605 * If the sending channel is no longer needed it is destroyed.
2527 * 2606 *
2607 * @param ss SubSamper in which the current peer is to be cleaned
2528 * @param peer the peer whose data is about to be cleaned 2608 * @param peer the peer whose data is about to be cleaned
2529 */ 2609 */
2530static void 2610static void
2531clean_peer (const struct GNUNET_PeerIdentity *peer) 2611clean_peer (struct SubSampler *ss,
2612 const struct GNUNET_PeerIdentity *peer)
2532{ 2613{
2533 if (GNUNET_NO == check_sending_channel_needed (peer)) 2614 if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (ss->peer_map,
2615 peer)))
2534 { 2616 {
2535 LOG (GNUNET_ERROR_TYPE_DEBUG, 2617 LOG (GNUNET_ERROR_TYPE_DEBUG,
2536 "Going to remove send channel to peer %s\n", 2618 "Going to remove send channel to peer %s\n",
2537 GNUNET_i2s (peer)); 2619 GNUNET_i2s (peer));
2538 #ifdef ENABLE_MALICIOUS 2620 #ifdef ENABLE_MALICIOUS
2539 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) 2621 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2540 (void) destroy_sending_channel (peer); 2622 (void) destroy_sending_channel (get_peer_ctx (ss->peer_map, peer));
2541 #else /* ENABLE_MALICIOUS */ 2623 #else /* ENABLE_MALICIOUS */
2542 (void) destroy_sending_channel (peer); 2624 (void) destroy_sending_channel (get_peer_ctx (ss->peer_map, peer));
2543 #endif /* ENABLE_MALICIOUS */ 2625 #endif /* ENABLE_MALICIOUS */
2544 } 2626 }
2545 2627
2546 if ( (GNUNET_NO == check_peer_send_intention (peer)) && 2628 if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (ss->peer_map,
2547 (GNUNET_NO == View_contains_peer (mss->view, peer)) && 2629 peer))) &&
2548 (GNUNET_NO == CustomPeerMap_contains_peer (mss->push_map, peer)) && 2630 (GNUNET_NO == View_contains_peer (ss->view, peer)) &&
2549 (GNUNET_NO == CustomPeerMap_contains_peer (mss->push_map, peer)) && 2631 (GNUNET_NO == CustomPeerMap_contains_peer (ss->push_map, peer)) &&
2550 (0 == RPS_sampler_count_id (mss->sampler, peer)) && 2632 (GNUNET_NO == CustomPeerMap_contains_peer (ss->push_map, peer)) &&
2551 (GNUNET_NO != check_removable (peer)) ) 2633 (0 == RPS_sampler_count_id (ss->sampler, peer)) &&
2634 (GNUNET_NO != check_removable (get_peer_ctx (ss->peer_map, peer))) )
2552 { /* We can safely remove this peer */ 2635 { /* We can safely remove this peer */
2553 LOG (GNUNET_ERROR_TYPE_DEBUG, 2636 LOG (GNUNET_ERROR_TYPE_DEBUG,
2554 "Going to remove peer %s\n", 2637 "Going to remove peer %s\n",
2555 GNUNET_i2s (peer)); 2638 GNUNET_i2s (peer));
2556 remove_peer (peer); 2639 remove_peer (ss, peer);
2557 return; 2640 return;
2558 } 2641 }
2559} 2642}
@@ -2567,9 +2650,8 @@ clean_peer (const struct GNUNET_PeerIdentity *peer)
2567 * Also check if the knowledge about this peer is still needed. 2650 * Also check if the knowledge about this peer is still needed.
2568 * If not, remove this peer from our knowledge. 2651 * If not, remove this peer from our knowledge.
2569 * 2652 *
2570 * @param cls The closure 2653 * @param cls The closure - Context to the channel
2571 * @param channel The channel being closed 2654 * @param channel The channel being closed
2572 * @param channel_ctx The context associated with this channel
2573 */ 2655 */
2574static void 2656static void
2575cleanup_destroyed_channel (void *cls, 2657cleanup_destroyed_channel (void *cls,
@@ -2577,7 +2659,6 @@ cleanup_destroyed_channel (void *cls,
2577{ 2659{
2578 struct ChannelCtx *channel_ctx = cls; 2660 struct ChannelCtx *channel_ctx = cls;
2579 struct PeerContext *peer_ctx = channel_ctx->peer_ctx; 2661 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2580 (void) cls;
2581 (void) channel; 2662 (void) channel;
2582 2663
2583 channel_ctx->channel = NULL; 2664 channel_ctx->channel = NULL;
@@ -2585,7 +2666,7 @@ cleanup_destroyed_channel (void *cls,
2585 if (NULL != peer_ctx && 2666 if (NULL != peer_ctx &&
2586 peer_ctx->send_channel_ctx == channel_ctx) 2667 peer_ctx->send_channel_ctx == channel_ctx)
2587 { 2668 {
2588 remove_peer (&peer_ctx->peer_id); 2669 remove_peer (peer_ctx->ss, &peer_ctx->peer_id);
2589 } 2670 }
2590} 2671}
2591 2672
@@ -2599,6 +2680,16 @@ cleanup_destroyed_channel (void *cls,
2599 * SubSampler 2680 * SubSampler
2600***********************************************************************/ 2681***********************************************************************/
2601 2682
2683/**
2684 * @brief Create a new SUbSampler
2685 *
2686 * @param shared_value Value shared among rps instances on other hosts that
2687 * defines a subgroup to sample from.
2688 * @param sampler_size Size of the sampler
2689 * @param round_interval Interval (in average) between two rounds
2690 *
2691 * @return SubSampler
2692 */
2602struct SubSampler * 2693struct SubSampler *
2603new_subsampler (const char *shared_value, 2694new_subsampler (const char *shared_value,
2604 uint32_t sampler_size, 2695 uint32_t sampler_size,
@@ -2640,7 +2731,7 @@ new_subsampler (const char *shared_value,
2640 GNUNET_CADET_open_port (ss->cadet_handle, 2731 GNUNET_CADET_open_port (ss->cadet_handle,
2641 &ss->port, 2732 &ss->port,
2642 &handle_inbound_channel, /* Connect handler */ 2733 &handle_inbound_channel, /* Connect handler */
2643 NULL, /* cls */ 2734 ss, /* cls */
2644 NULL, /* WindowSize handler */ 2735 NULL, /* WindowSize handler */
2645 &cleanup_destroyed_channel, /* Disconnect handler */ 2736 &cleanup_destroyed_channel, /* Disconnect handler */
2646 cadet_handlers); 2737 cadet_handlers);
@@ -2703,6 +2794,11 @@ new_subsampler (const char *shared_value,
2703***********************************************************************/ 2794***********************************************************************/
2704 2795
2705 2796
2797/**
2798 * @brief Destroy the context for a (connected) client
2799 *
2800 * @param cli_ctx Context to destroy
2801 */
2706static void 2802static void
2707destroy_cli_ctx (struct ClientContext *cli_ctx) 2803destroy_cli_ctx (struct ClientContext *cli_ctx)
2708{ 2804{
@@ -2719,6 +2815,13 @@ destroy_cli_ctx (struct ClientContext *cli_ctx)
2719 * 2815 *
2720 * Updates sizes of sampler list and view and adapt those lists 2816 * Updates sizes of sampler list and view and adapt those lists
2721 * accordingly. 2817 * accordingly.
2818 *
2819 * implements #GNUNET_NSE_Callback
2820 *
2821 * @param cls Closure - SubSampler
2822 * @param timestamp time when the estimate was received from the server (or created by the server)
2823 * @param logestimate the log(Base 2) value of the current network size estimate
2824 * @param std_dev standard deviation for the estimate
2722 */ 2825 */
2723static void 2826static void
2724nse_callback (void *cls, 2827nse_callback (void *cls,
@@ -2727,34 +2830,34 @@ nse_callback (void *cls,
2727{ 2830{
2728 double estimate; 2831 double estimate;
2729 //double scale; // TODO this might go gloabal/config 2832 //double scale; // TODO this might go gloabal/config
2730 (void) cls; 2833 struct SubSampler *ss = cls;
2731 (void) timestamp; 2834 (void) timestamp;
2732 2835
2733 LOG (GNUNET_ERROR_TYPE_DEBUG, 2836 LOG (GNUNET_ERROR_TYPE_DEBUG,
2734 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n", 2837 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2735 logestimate, std_dev, RPS_sampler_get_size (mss->sampler)); 2838 logestimate, std_dev, RPS_sampler_get_size (ss->sampler));
2736 //scale = .01; 2839 //scale = .01;
2737 estimate = GNUNET_NSE_log_estimate_to_n (logestimate); 2840 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2738 // GNUNET_NSE_log_estimate_to_n (logestimate); 2841 // GNUNET_NSE_log_estimate_to_n (logestimate);
2739 estimate = pow (estimate, 1.0 / 3); 2842 estimate = pow (estimate, 1.0 / 3);
2740 // TODO add if std_dev is a number 2843 // TODO add if std_dev is a number
2741 // estimate += (std_dev * scale); 2844 // estimate += (std_dev * scale);
2742 if (mss->view_size_est_min < ceil (estimate)) 2845 if (ss->view_size_est_min < ceil (estimate))
2743 { 2846 {
2744 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate); 2847 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2745 mss->sampler_size_est_need = estimate; 2848 ss->sampler_size_est_need = estimate;
2746 mss->view_size_est_need = estimate; 2849 ss->view_size_est_need = estimate;
2747 } else 2850 } else
2748 { 2851 {
2749 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); 2852 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2750 //mss->sampler_size_est_need = mss->view_size_est_min; 2853 //ss->sampler_size_est_need = ss->view_size_est_min;
2751 mss->view_size_est_need = mss->view_size_est_min; 2854 ss->view_size_est_need = ss->view_size_est_min;
2752 } 2855 }
2753 GNUNET_STATISTICS_set (stats, "view size aim", mss->view_size_est_need, GNUNET_NO); 2856 GNUNET_STATISTICS_set (stats, "view size aim", ss->view_size_est_need, GNUNET_NO);
2754 2857
2755 /* If the NSE has changed adapt the lists accordingly */ 2858 /* If the NSE has changed adapt the lists accordingly */
2756 resize_wrapper (mss->sampler, mss->sampler_size_est_need); 2859 resize_wrapper (ss->sampler, ss->sampler_size_est_need);
2757 View_change_len (mss->view, mss->view_size_est_need); 2860 View_change_len (ss->view, ss->view_size_est_need);
2758} 2861}
2759 2862
2760 2863
@@ -2765,6 +2868,7 @@ nse_callback (void *cls,
2765 * @param cls the closure (#ClientContext) 2868 * @param cls the closure (#ClientContext)
2766 * @param msg the message 2869 * @param msg the message
2767 * @return #GNUNET_OK if @a msg is well-formed 2870 * @return #GNUNET_OK if @a msg is well-formed
2871 * #GNUNET_SYSERR otherwise
2768 */ 2872 */
2769static int 2873static int
2770check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg) 2874check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
@@ -2814,7 +2918,7 @@ handle_client_seed (void *cls,
2814 i, 2918 i,
2815 GNUNET_i2s (&peers[i])); 2919 GNUNET_i2s (&peers[i]));
2816 2920
2817 got_peer (&peers[i]); 2921 got_peer (cli_ctx->ss, &peers[i]);
2818 } 2922 }
2819 GNUNET_SERVICE_client_continue (cli_ctx->client); 2923 GNUNET_SERVICE_client_continue (cli_ctx->client);
2820} 2924}
@@ -2824,7 +2928,8 @@ handle_client_seed (void *cls,
2824 * Handle RPS request from the client. 2928 * Handle RPS request from the client.
2825 * 2929 *
2826 * @param cls Client context 2930 * @param cls Client context
2827 * @param message unused 2931 * @param message Message containing the numer of updates the client wants to
2932 * receive
2828 */ 2933 */
2829static void 2934static void
2830handle_client_view_request (void *cls, 2935handle_client_view_request (void *cls,
@@ -2926,8 +3031,8 @@ handle_client_stream_cancel (void *cls,
2926 * This does nothing. But without calling #GNUNET_CADET_receive_done() 3031 * This does nothing. But without calling #GNUNET_CADET_receive_done()
2927 * the channel is blocked for all other communication. 3032 * the channel is blocked for all other communication.
2928 * 3033 *
2929 * @param cls Closure 3034 * @param cls Closure - Context of channel
2930 * @param msg The message header 3035 * @param msg Message - unused
2931 */ 3036 */
2932static void 3037static void
2933handle_peer_check (void *cls, 3038handle_peer_check (void *cls,
@@ -2947,14 +3052,15 @@ handle_peer_check (void *cls,
2947 GNUNET_CADET_receive_done (channel_ctx->channel); 3052 GNUNET_CADET_receive_done (channel_ctx->channel);
2948} 3053}
2949 3054
3055
2950/** 3056/**
2951 * Handle a PUSH message from another peer. 3057 * Handle a PUSH message from another peer.
2952 * 3058 *
2953 * Check the proof of work and store the PeerID 3059 * Check the proof of work and store the PeerID
2954 * in the temporary list for pushed PeerIDs. 3060 * in the temporary list for pushed PeerIDs.
2955 * 3061 *
2956 * @param cls Closure 3062 * @param cls Closure - Context of channel
2957 * @param msg The message header 3063 * @param msg Message - unused
2958 */ 3064 */
2959static void 3065static void
2960handle_peer_push (void *cls, 3066handle_peer_push (void *cls,
@@ -2981,9 +3087,8 @@ handle_peer_push (void *cls,
2981 tmp_att_peer->peer_id = *peer; 3087 tmp_att_peer->peer_id = *peer;
2982 if (NULL == att_peer_set) 3088 if (NULL == att_peer_set)
2983 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO); 3089 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
2984 if (GNUNET_NO == 3090 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
2985 GNUNET_CONTAINER_multipeermap_contains (att_peer_set, 3091 peer))
2986 peer))
2987 { 3092 {
2988 GNUNET_CONTAINER_DLL_insert (att_peers_head, 3093 GNUNET_CONTAINER_DLL_insert (att_peers_head,
2989 att_peers_tail, 3094 att_peers_tail,
@@ -3004,9 +3109,10 @@ handle_peer_push (void *cls,
3004 #endif /* ENABLE_MALICIOUS */ 3109 #endif /* ENABLE_MALICIOUS */
3005 3110
3006 /* Add the sending peer to the push_map */ 3111 /* Add the sending peer to the push_map */
3007 CustomPeerMap_put (mss->push_map, peer); 3112 CustomPeerMap_put (channel_ctx->peer_ctx->ss->push_map, peer);
3008 3113
3009 GNUNET_break_op (check_peer_known (peer)); 3114 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map,
3115 &channel_ctx->peer_ctx->peer_id));
3010 GNUNET_CADET_receive_done (channel_ctx->channel); 3116 GNUNET_CADET_receive_done (channel_ctx->channel);
3011} 3117}
3012 3118
@@ -3016,15 +3122,16 @@ handle_peer_push (void *cls,
3016 * 3122 *
3017 * Reply with the view of PeerIDs. 3123 * Reply with the view of PeerIDs.
3018 * 3124 *
3019 * @param cls Closure 3125 * @param cls Closure - Context of channel
3020 * @param msg The message header 3126 * @param msg Message - unused
3021 */ 3127 */
3022static void 3128static void
3023handle_peer_pull_request (void *cls, 3129handle_peer_pull_request (void *cls,
3024 const struct GNUNET_MessageHeader *msg) 3130 const struct GNUNET_MessageHeader *msg)
3025{ 3131{
3026 const struct ChannelCtx *channel_ctx = cls; 3132 const struct ChannelCtx *channel_ctx = cls;
3027 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id; 3133 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3134 const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3028 const struct GNUNET_PeerIdentity *view_array; 3135 const struct GNUNET_PeerIdentity *view_array;
3029 (void) msg; 3136 (void) msg;
3030 3137
@@ -3035,22 +3142,25 @@ handle_peer_pull_request (void *cls,
3035 if (1 == mal_type 3142 if (1 == mal_type
3036 || 3 == mal_type) 3143 || 3 == mal_type)
3037 { /* Try to maximise representation */ 3144 { /* Try to maximise representation */
3038 send_pull_reply (peer, mal_peers, num_mal_peers); 3145 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3039 } 3146 }
3040 3147
3041 else if (2 == mal_type) 3148 else if (2 == mal_type)
3042 { /* Try to partition network */ 3149 { /* Try to partition network */
3043 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) 3150 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3044 { 3151 {
3045 send_pull_reply (peer, mal_peers, num_mal_peers); 3152 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3046 } 3153 }
3047 } 3154 }
3048 #endif /* ENABLE_MALICIOUS */ 3155 #endif /* ENABLE_MALICIOUS */
3049 3156
3050 GNUNET_break_op (check_peer_known (peer)); 3157 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map,
3158 &channel_ctx->peer_ctx->peer_id));
3051 GNUNET_CADET_receive_done (channel_ctx->channel); 3159 GNUNET_CADET_receive_done (channel_ctx->channel);
3052 view_array = View_get_as_array (mss->view); 3160 view_array = View_get_as_array (channel_ctx->peer_ctx->ss->view);
3053 send_pull_reply (peer, view_array, View_size (mss->view)); 3161 send_pull_reply (peer_ctx,
3162 view_array,
3163 View_size (channel_ctx->peer_ctx->ss->view));
3054} 3164}
3055 3165
3056 3166
@@ -3058,8 +3168,8 @@ handle_peer_pull_request (void *cls,
3058 * Check whether we sent a corresponding request and 3168 * Check whether we sent a corresponding request and
3059 * whether this reply is the first one. 3169 * whether this reply is the first one.
3060 * 3170 *
3061 * @param cls Closure 3171 * @param cls Closure - Context of channel
3062 * @param msg The message header 3172 * @param msg Message containing the replied peers
3063 */ 3173 */
3064static int 3174static int
3065check_peer_pull_reply (void *cls, 3175check_peer_pull_reply (void *cls,
@@ -3086,7 +3196,8 @@ check_peer_pull_reply (void *cls,
3086 return GNUNET_SYSERR; 3196 return GNUNET_SYSERR;
3087 } 3197 }
3088 3198
3089 if (GNUNET_YES != check_peer_flag (&sender_ctx->peer_id, 3199 if (GNUNET_YES != check_peer_flag (sender_ctx->ss->peer_map,
3200 &sender_ctx->peer_id,
3090 Peers_PULL_REPLY_PENDING)) 3201 Peers_PULL_REPLY_PENDING))
3091 { 3202 {
3092 LOG (GNUNET_ERROR_TYPE_WARNING, 3203 LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -3102,6 +3213,7 @@ check_peer_pull_reply (void *cls,
3102 return GNUNET_OK; 3213 return GNUNET_OK;
3103} 3214}
3104 3215
3216
3105/** 3217/**
3106 * Handle PULL REPLY message from another peer. 3218 * Handle PULL REPLY message from another peer.
3107 * 3219 *
@@ -3165,23 +3277,28 @@ handle_peer_pull_reply (void *cls,
3165 } 3277 }
3166 #endif /* ENABLE_MALICIOUS */ 3278 #endif /* ENABLE_MALICIOUS */
3167 /* Make sure we 'know' about this peer */ 3279 /* Make sure we 'know' about this peer */
3168 (void) insert_peer (&peers[i]); 3280 (void) insert_peer (channel_ctx->peer_ctx->ss, &peers[i]);
3169 3281
3170 if (GNUNET_YES == check_peer_valid (&peers[i])) 3282 if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->ss->valid_peers,
3283 &peers[i]))
3171 { 3284 {
3172 CustomPeerMap_put (mss->pull_map, &peers[i]); 3285 CustomPeerMap_put (channel_ctx->peer_ctx->ss->pull_map, &peers[i]);
3173 } 3286 }
3174 else 3287 else
3175 { 3288 {
3176 schedule_operation (&peers[i], insert_in_pull_map); 3289 schedule_operation (channel_ctx->peer_ctx,
3177 (void) issue_peer_online_check (&peers[i]); 3290 insert_in_pull_map,
3291 channel_ctx->peer_ctx->ss); /* cls */
3292 (void) issue_peer_online_check (channel_ctx->peer_ctx->ss, &peers[i]);
3178 } 3293 }
3179 } 3294 }
3180 3295
3181 UNSET_PEER_FLAG (get_peer_ctx (sender), Peers_PULL_REPLY_PENDING); 3296 UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->ss->peer_map, sender),
3182 clean_peer (sender); 3297 Peers_PULL_REPLY_PENDING);
3298 clean_peer (channel_ctx->peer_ctx->ss, sender);
3183 3299
3184 GNUNET_break_op (check_peer_known (sender)); 3300 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map,
3301 sender));
3185 GNUNET_CADET_receive_done (channel_ctx->channel); 3302 GNUNET_CADET_receive_done (channel_ctx->channel);
3186} 3303}
3187 3304
@@ -3193,12 +3310,12 @@ handle_peer_pull_reply (void *cls,
3193 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min)) 3310 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3194 * It would return a random value between 2 and 6 min. 3311 * It would return a random value between 2 and 6 min.
3195 * 3312 *
3196 * @param mean the mean 3313 * @param mean the mean time until the next round
3197 * @param spread the inverse amount of deviation from the mean 3314 * @param spread the inverse amount of deviation from the mean
3198 */ 3315 */
3199static struct GNUNET_TIME_Relative 3316static struct GNUNET_TIME_Relative
3200compute_rand_delay (struct GNUNET_TIME_Relative mean, 3317compute_rand_delay (struct GNUNET_TIME_Relative mean,
3201 unsigned int spread) 3318 unsigned int spread)
3202{ 3319{
3203 struct GNUNET_TIME_Relative half_interval; 3320 struct GNUNET_TIME_Relative half_interval;
3204 struct GNUNET_TIME_Relative ret; 3321 struct GNUNET_TIME_Relative ret;
@@ -3238,23 +3355,24 @@ compute_rand_delay (struct GNUNET_TIME_Relative mean,
3238/** 3355/**
3239 * Send single pull request 3356 * Send single pull request
3240 * 3357 *
3241 * @param peer_id the peer to send the pull request to. 3358 * @param peer_ctx Context to the peer to send request to
3242 */ 3359 */
3243static void 3360static void
3244send_pull_request (const struct GNUNET_PeerIdentity *peer) 3361send_pull_request (struct PeerContext *peer_ctx)
3245{ 3362{
3246 struct GNUNET_MQ_Envelope *ev; 3363 struct GNUNET_MQ_Envelope *ev;
3247 3364
3248 GNUNET_assert (GNUNET_NO == check_peer_flag (peer, 3365 GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->ss->peer_map,
3249 Peers_PULL_REPLY_PENDING)); 3366 &peer_ctx->peer_id,
3250 SET_PEER_FLAG (get_peer_ctx (peer), Peers_PULL_REPLY_PENDING); 3367 Peers_PULL_REPLY_PENDING));
3368 SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING);
3251 3369
3252 LOG (GNUNET_ERROR_TYPE_DEBUG, 3370 LOG (GNUNET_ERROR_TYPE_DEBUG,
3253 "Going to send PULL REQUEST to peer %s.\n", 3371 "Going to send PULL REQUEST to peer %s.\n",
3254 GNUNET_i2s (peer)); 3372 GNUNET_i2s (&peer_ctx->peer_id));
3255 3373
3256 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); 3374 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3257 send_message (peer, ev, "PULL REQUEST"); 3375 send_message (peer_ctx, ev, "PULL REQUEST");
3258 GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO); 3376 GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3259} 3377}
3260 3378
@@ -3262,19 +3380,19 @@ send_pull_request (const struct GNUNET_PeerIdentity *peer)
3262/** 3380/**
3263 * Send single push 3381 * Send single push
3264 * 3382 *
3265 * @param peer_id the peer to send the push to. 3383 * @param peer_ctx Context of peer to send push to
3266 */ 3384 */
3267static void 3385static void
3268send_push (const struct GNUNET_PeerIdentity *peer_id) 3386send_push (struct PeerContext *peer_ctx)
3269{ 3387{
3270 struct GNUNET_MQ_Envelope *ev; 3388 struct GNUNET_MQ_Envelope *ev;
3271 3389
3272 LOG (GNUNET_ERROR_TYPE_DEBUG, 3390 LOG (GNUNET_ERROR_TYPE_DEBUG,
3273 "Going to send PUSH to peer %s.\n", 3391 "Going to send PUSH to peer %s.\n",
3274 GNUNET_i2s (peer_id)); 3392 GNUNET_i2s (&peer_ctx->peer_id));
3275 3393
3276 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); 3394 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3277 send_message (peer_id, ev, "PUSH"); 3395 send_message (peer_ctx, ev, "PUSH");
3278 GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO); 3396 GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3279} 3397}
3280 3398
@@ -3334,6 +3452,7 @@ handle_client_act_malicious (void *cls,
3334 struct GNUNET_PeerIdentity *peers; 3452 struct GNUNET_PeerIdentity *peers;
3335 uint32_t num_mal_peers_sent; 3453 uint32_t num_mal_peers_sent;
3336 uint32_t num_mal_peers_old; 3454 uint32_t num_mal_peers_old;
3455 struct SubSampler *ss = cli_ctx->ss;
3337 3456
3338 /* Do actual logic */ 3457 /* Do actual logic */
3339 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 3458 peers = (struct GNUNET_PeerIdentity *) &msg[1];
@@ -3365,8 +3484,8 @@ handle_client_act_malicious (void *cls,
3365 mal_peer_set); 3484 mal_peer_set);
3366 3485
3367 /* Substitute do_round () with do_mal_round () */ 3486 /* Substitute do_round () with do_mal_round () */
3368 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3487 GNUNET_SCHEDULER_cancel (ss->do_round_task);
3369 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); 3488 ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, ss);
3370 } 3489 }
3371 3490
3372 else if ( (2 == mal_type) || 3491 else if ( (2 == mal_type) ||
@@ -3398,9 +3517,9 @@ handle_client_act_malicious (void *cls,
3398 &msg->attacked_peer, 3517 &msg->attacked_peer,
3399 sizeof (struct GNUNET_PeerIdentity)); 3518 sizeof (struct GNUNET_PeerIdentity));
3400 /* Set the flag of the attacked peer to valid to avoid problems */ 3519 /* Set the flag of the attacked peer to valid to avoid problems */
3401 if (GNUNET_NO == check_peer_known (&attacked_peer)) 3520 if (GNUNET_NO == check_peer_known (ss->peer_map, &attacked_peer))
3402 { 3521 {
3403 (void) issue_peer_online_check (&attacked_peer); 3522 (void) issue_peer_online_check (ss, &attacked_peer);
3404 } 3523 }
3405 3524
3406 LOG (GNUNET_ERROR_TYPE_DEBUG, 3525 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -3408,16 +3527,16 @@ handle_client_act_malicious (void *cls,
3408 GNUNET_i2s (&attacked_peer)); 3527 GNUNET_i2s (&attacked_peer));
3409 3528
3410 /* Substitute do_round () with do_mal_round () */ 3529 /* Substitute do_round () with do_mal_round () */
3411 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3530 GNUNET_SCHEDULER_cancel (ss->do_round_task);
3412 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); 3531 ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, ss);
3413 } 3532 }
3414 else if (0 == mal_type) 3533 else if (0 == mal_type)
3415 { /* Stop acting malicious */ 3534 { /* Stop acting malicious */
3416 GNUNET_array_grow (mal_peers, num_mal_peers, 0); 3535 GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3417 3536
3418 /* Substitute do_mal_round () with do_round () */ 3537 /* Substitute do_mal_round () with do_round () */
3419 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3538 GNUNET_SCHEDULER_cancel (ss->do_round_task);
3420 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); 3539 ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, ss);
3421 } 3540 }
3422 else 3541 else
3423 { 3542 {
@@ -3432,6 +3551,8 @@ handle_client_act_malicious (void *cls,
3432 * Send out PUSHes and PULLs maliciously. 3551 * Send out PUSHes and PULLs maliciously.
3433 * 3552 *
3434 * This is executed regylary. 3553 * This is executed regylary.
3554 *
3555 * @param cls Closure - SubSamper
3435 */ 3556 */
3436static void 3557static void
3437do_mal_round (void *cls) 3558do_mal_round (void *cls)
@@ -3440,12 +3561,12 @@ do_mal_round (void *cls)
3440 uint32_t i; 3561 uint32_t i;
3441 struct GNUNET_TIME_Relative time_next_round; 3562 struct GNUNET_TIME_Relative time_next_round;
3442 struct AttackedPeer *tmp_att_peer; 3563 struct AttackedPeer *tmp_att_peer;
3443 (void) cls; 3564 struct SubSampler *ss = cls;
3444 3565
3445 LOG (GNUNET_ERROR_TYPE_DEBUG, 3566 LOG (GNUNET_ERROR_TYPE_DEBUG,
3446 "Going to execute next round maliciously type %" PRIu32 ".\n", 3567 "Going to execute next round maliciously type %" PRIu32 ".\n",
3447 mal_type); 3568 mal_type);
3448 mss->do_round_task = NULL; 3569 ss->do_round_task = NULL;
3449 GNUNET_assert (mal_type <= 3); 3570 GNUNET_assert (mal_type <= 3);
3450 /* Do malicious actions */ 3571 /* Do malicious actions */
3451 if (1 == mal_type) 3572 if (1 == mal_type)
@@ -3468,7 +3589,7 @@ do_mal_round (void *cls)
3468 else 3589 else
3469 att_peer_index = att_peer_index->next; 3590 att_peer_index = att_peer_index->next;
3470 3591
3471 send_push (&att_peer_index->peer_id); 3592 send_push (get_peer_ctx (ss->peer_map, &att_peer_index->peer_id));
3472 } 3593 }
3473 3594
3474 /* Send PULLs to some peers to learn about additional peers to attack */ 3595 /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3480,7 +3601,7 @@ do_mal_round (void *cls)
3480 else 3601 else
3481 att_peer_index = tmp_att_peer->next; 3602 att_peer_index = tmp_att_peer->next;
3482 3603
3483 send_pull_request (&tmp_att_peer->peer_id); 3604 send_pull_request (get_peer_ctx (ss->peer_map, &tmp_att_peer->peer_id));
3484 } 3605 }
3485 } 3606 }
3486 3607
@@ -3491,9 +3612,11 @@ do_mal_round (void *cls)
3491 * Send as many pushes to the attacked peer as possible 3612 * Send as many pushes to the attacked peer as possible
3492 * That is one push per round as it will ignore more. 3613 * That is one push per round as it will ignore more.
3493 */ 3614 */
3494 (void) issue_peer_online_check (&attacked_peer); 3615 (void) issue_peer_online_check (ss, &attacked_peer);
3495 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE)) 3616 if (GNUNET_YES == check_peer_flag (ss->peer_map,
3496 send_push (&attacked_peer); 3617 &attacked_peer,
3618 Peers_ONLINE))
3619 send_push (get_peer_ctx (ss->peer_map, &attacked_peer));
3497 } 3620 }
3498 3621
3499 3622
@@ -3501,18 +3624,20 @@ do_mal_round (void *cls)
3501 { /* Combined attack */ 3624 { /* Combined attack */
3502 3625
3503 /* Send PUSH to attacked peers */ 3626 /* Send PUSH to attacked peers */
3504 if (GNUNET_YES == check_peer_known (&attacked_peer)) 3627 if (GNUNET_YES == check_peer_known (ss->peer_map, &attacked_peer))
3505 { 3628 {
3506 (void) issue_peer_online_check (&attacked_peer); 3629 (void) issue_peer_online_check (ss, &attacked_peer);
3507 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE)) 3630 if (GNUNET_YES == check_peer_flag (ss->peer_map,
3631 &attacked_peer,
3632 Peers_ONLINE))
3508 { 3633 {
3509 LOG (GNUNET_ERROR_TYPE_DEBUG, 3634 LOG (GNUNET_ERROR_TYPE_DEBUG,
3510 "Goding to send push to attacked peer (%s)\n", 3635 "Goding to send push to attacked peer (%s)\n",
3511 GNUNET_i2s (&attacked_peer)); 3636 GNUNET_i2s (&attacked_peer));
3512 send_push (&attacked_peer); 3637 send_push (get_peer_ctx (ss->peer_map, &attacked_peer));
3513 } 3638 }
3514 } 3639 }
3515 (void) issue_peer_online_check (&attacked_peer); 3640 (void) issue_peer_online_check (ss, &attacked_peer);
3516 3641
3517 /* The maximum of pushes we're going to send this round */ 3642 /* The maximum of pushes we're going to send this round */
3518 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1, 3643 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
@@ -3530,7 +3655,7 @@ do_mal_round (void *cls)
3530 else 3655 else
3531 att_peer_index = att_peer_index->next; 3656 att_peer_index = att_peer_index->next;
3532 3657
3533 send_push (&att_peer_index->peer_id); 3658 send_push (get_peer_ctx (ss->peer_map, &att_peer_index->peer_id));
3534 } 3659 }
3535 3660
3536 /* Send PULLs to some peers to learn about additional peers to attack */ 3661 /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3542,18 +3667,18 @@ do_mal_round (void *cls)
3542 else 3667 else
3543 att_peer_index = tmp_att_peer->next; 3668 att_peer_index = tmp_att_peer->next;
3544 3669
3545 send_pull_request (&tmp_att_peer->peer_id); 3670 send_pull_request (get_peer_ctx (ss->peer_map, &tmp_att_peer->peer_id));
3546 } 3671 }
3547 } 3672 }
3548 3673
3549 /* Schedule next round */ 3674 /* Schedule next round */
3550 time_next_round = compute_rand_delay (mss->round_interval, 2); 3675 time_next_round = compute_rand_delay (ss->round_interval, 2);
3551 3676
3552 //mss->do_round_task = GNUNET_SCHEDULER_add_delayed (mss->round_interval, &do_mal_round, 3677 //ss->do_round_task = GNUNET_SCHEDULER_add_delayed (ss->round_interval, &do_mal_round,
3553 //NULL); 3678 //NULL);
3554 GNUNET_assert (NULL == mss->do_round_task); 3679 GNUNET_assert (NULL == ss->do_round_task);
3555 mss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, 3680 ss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3556 &do_mal_round, NULL); 3681 &do_mal_round, ss);
3557 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 3682 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3558} 3683}
3559#endif /* ENABLE_MALICIOUS */ 3684#endif /* ENABLE_MALICIOUS */
@@ -3562,6 +3687,8 @@ do_mal_round (void *cls)
3562 * Send out PUSHes and PULLs, possibly update #view, samplers. 3687 * Send out PUSHes and PULLs, possibly update #view, samplers.
3563 * 3688 *
3564 * This is executed regylary. 3689 * This is executed regylary.
3690 *
3691 * @param cls Closure - SubSampler
3565 */ 3692 */
3566static void 3693static void
3567do_round (void *cls) 3694do_round (void *cls)
@@ -3575,64 +3702,66 @@ do_round (void *cls)
3575 uint32_t second_border; 3702 uint32_t second_border;
3576 struct GNUNET_PeerIdentity peer; 3703 struct GNUNET_PeerIdentity peer;
3577 struct GNUNET_PeerIdentity *update_peer; 3704 struct GNUNET_PeerIdentity *update_peer;
3578 (void) cls; 3705 struct SubSampler *ss = cls;
3579 3706
3580 LOG (GNUNET_ERROR_TYPE_DEBUG, 3707 LOG (GNUNET_ERROR_TYPE_DEBUG,
3581 "Going to execute next round.\n"); 3708 "Going to execute next round.\n");
3582 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO); 3709 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3583 mss->do_round_task = NULL; 3710 ss->do_round_task = NULL;
3584 LOG (GNUNET_ERROR_TYPE_DEBUG, 3711 LOG (GNUNET_ERROR_TYPE_DEBUG,
3585 "Printing view:\n"); 3712 "Printing view:\n");
3586 to_file (mss->file_name_view_log, 3713 to_file (ss->file_name_view_log,
3587 "___ new round ___"); 3714 "___ new round ___");
3588 view_array = View_get_as_array (mss->view); 3715 view_array = View_get_as_array (ss->view);
3589 for (i = 0; i < View_size (mss->view); i++) 3716 for (i = 0; i < View_size (ss->view); i++)
3590 { 3717 {
3591 LOG (GNUNET_ERROR_TYPE_DEBUG, 3718 LOG (GNUNET_ERROR_TYPE_DEBUG,
3592 "\t%s\n", GNUNET_i2s (&view_array[i])); 3719 "\t%s\n", GNUNET_i2s (&view_array[i]));
3593 to_file (mss->file_name_view_log, 3720 to_file (ss->file_name_view_log,
3594 "=%s\t(do round)", 3721 "=%s\t(do round)",
3595 GNUNET_i2s_full (&view_array[i])); 3722 GNUNET_i2s_full (&view_array[i]));
3596 } 3723 }
3597 3724
3598 3725
3599 /* Send pushes and pull requests */ 3726 /* Send pushes and pull requests */
3600 if (0 < View_size (mss->view)) 3727 if (0 < View_size (ss->view))
3601 { 3728 {
3602 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 3729 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3603 View_size (mss->view)); 3730 View_size (ss->view));
3604 3731
3605 /* Send PUSHes */ 3732 /* Send PUSHes */
3606 a_peers = ceil (alpha * View_size (mss->view)); 3733 a_peers = ceil (alpha * View_size (ss->view));
3607 3734
3608 LOG (GNUNET_ERROR_TYPE_DEBUG, 3735 LOG (GNUNET_ERROR_TYPE_DEBUG,
3609 "Going to send pushes to %u (ceil (%f * %u)) peers.\n", 3736 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3610 a_peers, alpha, View_size (mss->view)); 3737 a_peers, alpha, View_size (ss->view));
3611 for (i = 0; i < a_peers; i++) 3738 for (i = 0; i < a_peers; i++)
3612 { 3739 {
3613 peer = view_array[permut[i]]; 3740 peer = view_array[permut[i]];
3614 // FIXME if this fails schedule/loop this for later 3741 // FIXME if this fails schedule/loop this for later
3615 send_push (&peer); 3742 send_push (get_peer_ctx (ss->peer_map, &peer));
3616 } 3743 }
3617 3744
3618 /* Send PULL requests */ 3745 /* Send PULL requests */
3619 b_peers = ceil (beta * View_size (mss->view)); 3746 b_peers = ceil (beta * View_size (ss->view));
3620 first_border = a_peers; 3747 first_border = a_peers;
3621 second_border = a_peers + b_peers; 3748 second_border = a_peers + b_peers;
3622 if (second_border > View_size (mss->view)) 3749 if (second_border > View_size (ss->view))
3623 { 3750 {
3624 first_border = View_size (mss->view) - b_peers; 3751 first_border = View_size (ss->view) - b_peers;
3625 second_border = View_size (mss->view); 3752 second_border = View_size (ss->view);
3626 } 3753 }
3627 LOG (GNUNET_ERROR_TYPE_DEBUG, 3754 LOG (GNUNET_ERROR_TYPE_DEBUG,
3628 "Going to send pulls to %u (ceil (%f * %u)) peers.\n", 3755 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3629 b_peers, beta, View_size (mss->view)); 3756 b_peers, beta, View_size (ss->view));
3630 for (i = first_border; i < second_border; i++) 3757 for (i = first_border; i < second_border; i++)
3631 { 3758 {
3632 peer = view_array[permut[i]]; 3759 peer = view_array[permut[i]];
3633 if ( GNUNET_NO == check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) 3760 if ( GNUNET_NO == check_peer_flag (ss->peer_map,
3761 &peer,
3762 Peers_PULL_REPLY_PENDING))
3634 { // FIXME if this fails schedule/loop this for later 3763 { // FIXME if this fails schedule/loop this for later
3635 send_pull_request (&peer); 3764 send_pull_request (get_peer_ctx (ss->peer_map, &peer));
3636 } 3765 }
3637 } 3766 }
3638 3767
@@ -3644,9 +3773,9 @@ do_round (void *cls)
3644 /* Update view */ 3773 /* Update view */
3645 /* TODO see how many peers are in push-/pull- list! */ 3774 /* TODO see how many peers are in push-/pull- list! */
3646 3775
3647 if ((CustomPeerMap_size (mss->push_map) <= alpha * mss->view_size_est_need) && 3776 if ((CustomPeerMap_size (ss->push_map) <= alpha * ss->view_size_est_need) &&
3648 (0 < CustomPeerMap_size (mss->push_map)) && 3777 (0 < CustomPeerMap_size (ss->push_map)) &&
3649 (0 < CustomPeerMap_size (mss->pull_map))) 3778 (0 < CustomPeerMap_size (ss->pull_map)))
3650 //if (GNUNET_YES) // disable blocking temporarily 3779 //if (GNUNET_YES) // disable blocking temporarily
3651 { /* If conditions for update are fulfilled, update */ 3780 { /* If conditions for update are fulfilled, update */
3652 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n"); 3781 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
@@ -3659,23 +3788,23 @@ do_round (void *cls)
3659 peers_to_clean_size = 0; 3788 peers_to_clean_size = 0;
3660 GNUNET_array_grow (peers_to_clean, 3789 GNUNET_array_grow (peers_to_clean,
3661 peers_to_clean_size, 3790 peers_to_clean_size,
3662 View_size (mss->view)); 3791 View_size (ss->view));
3663 GNUNET_memcpy (peers_to_clean, 3792 GNUNET_memcpy (peers_to_clean,
3664 view_array, 3793 view_array,
3665 View_size (mss->view) * sizeof (struct GNUNET_PeerIdentity)); 3794 View_size (ss->view) * sizeof (struct GNUNET_PeerIdentity));
3666 3795
3667 /* Seems like recreating is the easiest way of emptying the peermap */ 3796 /* Seems like recreating is the easiest way of emptying the peermap */
3668 View_clear (mss->view); 3797 View_clear (ss->view);
3669 to_file (mss->file_name_view_log, 3798 to_file (ss->file_name_view_log,
3670 "--- emptied ---"); 3799 "--- emptied ---");
3671 3800
3672 first_border = GNUNET_MIN (ceil (alpha * mss->view_size_est_need), 3801 first_border = GNUNET_MIN (ceil (alpha * ss->view_size_est_need),
3673 CustomPeerMap_size (mss->push_map)); 3802 CustomPeerMap_size (ss->push_map));
3674 second_border = first_border + 3803 second_border = first_border +
3675 GNUNET_MIN (floor (beta * mss->view_size_est_need), 3804 GNUNET_MIN (floor (beta * ss->view_size_est_need),
3676 CustomPeerMap_size (mss->pull_map)); 3805 CustomPeerMap_size (ss->pull_map));
3677 final_size = second_border + 3806 final_size = second_border +
3678 ceil ((1 - (alpha + beta)) * mss->view_size_est_need); 3807 ceil ((1 - (alpha + beta)) * ss->view_size_est_need);
3679 LOG (GNUNET_ERROR_TYPE_DEBUG, 3808 LOG (GNUNET_ERROR_TYPE_DEBUG,
3680 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n", 3809 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3681 first_border, 3810 first_border,
@@ -3684,18 +3813,19 @@ do_round (void *cls)
3684 3813
3685 /* Update view with peers received through PUSHes */ 3814 /* Update view with peers received through PUSHes */
3686 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 3815 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3687 CustomPeerMap_size (mss->push_map)); 3816 CustomPeerMap_size (ss->push_map));
3688 for (i = 0; i < first_border; i++) 3817 for (i = 0; i < first_border; i++)
3689 { 3818 {
3690 int inserted; 3819 int inserted;
3691 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (mss->push_map, 3820 inserted = insert_in_view (ss,
3821 CustomPeerMap_get_peer_by_index (ss->push_map,
3692 permut[i])); 3822 permut[i]));
3693 if (GNUNET_OK == inserted) 3823 if (GNUNET_OK == inserted)
3694 { 3824 {
3695 clients_notify_stream_peer (1, 3825 clients_notify_stream_peer (1,
3696 CustomPeerMap_get_peer_by_index (mss->push_map, permut[i])); 3826 CustomPeerMap_get_peer_by_index (ss->push_map, permut[i]));
3697 } 3827 }
3698 to_file (mss->file_name_view_log, 3828 to_file (ss->file_name_view_log,
3699 "+%s\t(push list)", 3829 "+%s\t(push list)",
3700 GNUNET_i2s_full (&view_array[i])); 3830 GNUNET_i2s_full (&view_array[i]));
3701 // TODO change the peer_flags accordingly 3831 // TODO change the peer_flags accordingly
@@ -3705,19 +3835,20 @@ do_round (void *cls)
3705 3835
3706 /* Update view with peers received through PULLs */ 3836 /* Update view with peers received through PULLs */
3707 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 3837 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3708 CustomPeerMap_size (mss->pull_map)); 3838 CustomPeerMap_size (ss->pull_map));
3709 for (i = first_border; i < second_border; i++) 3839 for (i = first_border; i < second_border; i++)
3710 { 3840 {
3711 int inserted; 3841 int inserted;
3712 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (mss->pull_map, 3842 inserted = insert_in_view (ss,
3713 permut[i - first_border])); 3843 CustomPeerMap_get_peer_by_index (ss->pull_map,
3844 permut[i - first_border]));
3714 if (GNUNET_OK == inserted) 3845 if (GNUNET_OK == inserted)
3715 { 3846 {
3716 clients_notify_stream_peer (1, 3847 clients_notify_stream_peer (1,
3717 CustomPeerMap_get_peer_by_index (mss->pull_map, 3848 CustomPeerMap_get_peer_by_index (ss->pull_map,
3718 permut[i - first_border])); 3849 permut[i - first_border]));
3719 } 3850 }
3720 to_file (mss->file_name_view_log, 3851 to_file (ss->file_name_view_log,
3721 "+%s\t(pull list)", 3852 "+%s\t(pull list)",
3722 GNUNET_i2s_full (&view_array[i])); 3853 GNUNET_i2s_full (&view_array[i]));
3723 // TODO change the peer_flags accordingly 3854 // TODO change the peer_flags accordingly
@@ -3726,106 +3857,106 @@ do_round (void *cls)
3726 permut = NULL; 3857 permut = NULL;
3727 3858
3728 /* Update view with peers from history */ 3859 /* Update view with peers from history */
3729 RPS_sampler_get_n_rand_peers (mss->sampler, 3860 RPS_sampler_get_n_rand_peers (ss->sampler,
3730 final_size - second_border, 3861 final_size - second_border,
3731 hist_update, 3862 hist_update,
3732 NULL); 3863 ss);
3733 // TODO change the peer_flags accordingly 3864 // TODO change the peer_flags accordingly
3734 3865
3735 for (i = 0; i < View_size (mss->view); i++) 3866 for (i = 0; i < View_size (ss->view); i++)
3736 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]); 3867 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3737 3868
3738 /* Clean peers that were removed from the view */ 3869 /* Clean peers that were removed from the view */
3739 for (i = 0; i < peers_to_clean_size; i++) 3870 for (i = 0; i < peers_to_clean_size; i++)
3740 { 3871 {
3741 to_file (mss->file_name_view_log, 3872 to_file (ss->file_name_view_log,
3742 "-%s", 3873 "-%s",
3743 GNUNET_i2s_full (&peers_to_clean[i])); 3874 GNUNET_i2s_full (&peers_to_clean[i]));
3744 clean_peer (&peers_to_clean[i]); 3875 clean_peer (ss, &peers_to_clean[i]);
3745 } 3876 }
3746 3877
3747 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0); 3878 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3748 clients_notify_view_update(); 3879 clients_notify_view_update (ss);
3749 } else { 3880 } else {
3750 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n"); 3881 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3751 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO); 3882 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3752 if (CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 3883 if (CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) &&
3753 !(0 >= CustomPeerMap_size (mss->pull_map))) 3884 !(0 >= CustomPeerMap_size (ss->pull_map)))
3754 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO); 3885 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3755 if (CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 3886 if (CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) &&
3756 (0 >= CustomPeerMap_size (mss->pull_map))) 3887 (0 >= CustomPeerMap_size (ss->pull_map)))
3757 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO); 3888 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3758 if (0 >= CustomPeerMap_size (mss->push_map) && 3889 if (0 >= CustomPeerMap_size (ss->push_map) &&
3759 !(0 >= CustomPeerMap_size (mss->pull_map))) 3890 !(0 >= CustomPeerMap_size (ss->pull_map)))
3760 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO); 3891 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3761 if (0 >= CustomPeerMap_size (mss->push_map) && 3892 if (0 >= CustomPeerMap_size (ss->push_map) &&
3762 (0 >= CustomPeerMap_size (mss->pull_map))) 3893 (0 >= CustomPeerMap_size (ss->pull_map)))
3763 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO); 3894 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3764 if (0 >= CustomPeerMap_size (mss->pull_map) && 3895 if (0 >= CustomPeerMap_size (ss->pull_map) &&
3765 CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 3896 CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) &&
3766 0 >= CustomPeerMap_size (mss->push_map)) 3897 0 >= CustomPeerMap_size (ss->push_map))
3767 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO); 3898 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3768 } 3899 }
3769 // TODO independent of that also get some peers from CADET_get_peers()? 3900 // TODO independent of that also get some peers from CADET_get_peers()?
3770 GNUNET_STATISTICS_set (stats, 3901 GNUNET_STATISTICS_set (stats,
3771 "# peers in push map at end of round", 3902 "# peers in push map at end of round",
3772 CustomPeerMap_size (mss->push_map), 3903 CustomPeerMap_size (ss->push_map),
3773 GNUNET_NO); 3904 GNUNET_NO);
3774 GNUNET_STATISTICS_set (stats, 3905 GNUNET_STATISTICS_set (stats,
3775 "# peers in pull map at end of round", 3906 "# peers in pull map at end of round",
3776 CustomPeerMap_size (mss->pull_map), 3907 CustomPeerMap_size (ss->pull_map),
3777 GNUNET_NO); 3908 GNUNET_NO);
3778 GNUNET_STATISTICS_set (stats, 3909 GNUNET_STATISTICS_set (stats,
3779 "# peers in view at end of round", 3910 "# peers in view at end of round",
3780 View_size (mss->view), 3911 View_size (ss->view),
3781 GNUNET_NO); 3912 GNUNET_NO);
3782 3913
3783 LOG (GNUNET_ERROR_TYPE_DEBUG, 3914 LOG (GNUNET_ERROR_TYPE_DEBUG,
3784 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (mss->view%u) = %.2f)\n", 3915 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (ss->view%u) = %.2f)\n",
3785 CustomPeerMap_size (mss->push_map), 3916 CustomPeerMap_size (ss->push_map),
3786 CustomPeerMap_size (mss->pull_map), 3917 CustomPeerMap_size (ss->pull_map),
3787 alpha, 3918 alpha,
3788 View_size (mss->view), 3919 View_size (ss->view),
3789 alpha * View_size (mss->view)); 3920 alpha * View_size (ss->view));
3790 3921
3791 /* Update samplers */ 3922 /* Update samplers */
3792 for (i = 0; i < CustomPeerMap_size (mss->push_map); i++) 3923 for (i = 0; i < CustomPeerMap_size (ss->push_map); i++)
3793 { 3924 {
3794 update_peer = CustomPeerMap_get_peer_by_index (mss->push_map, i); 3925 update_peer = CustomPeerMap_get_peer_by_index (ss->push_map, i);
3795 LOG (GNUNET_ERROR_TYPE_DEBUG, 3926 LOG (GNUNET_ERROR_TYPE_DEBUG,
3796 "Updating with peer %s from push list\n", 3927 "Updating with peer %s from push list\n",
3797 GNUNET_i2s (update_peer)); 3928 GNUNET_i2s (update_peer));
3798 insert_in_sampler (NULL, update_peer); 3929 insert_in_sampler (ss, update_peer);
3799 clean_peer (update_peer); /* This cleans only if it is not in the view */ 3930 clean_peer (ss, update_peer); /* This cleans only if it is not in the view */
3800 } 3931 }
3801 3932
3802 for (i = 0; i < CustomPeerMap_size (mss->pull_map); i++) 3933 for (i = 0; i < CustomPeerMap_size (ss->pull_map); i++)
3803 { 3934 {
3804 LOG (GNUNET_ERROR_TYPE_DEBUG, 3935 LOG (GNUNET_ERROR_TYPE_DEBUG,
3805 "Updating with peer %s from pull list\n", 3936 "Updating with peer %s from pull list\n",
3806 GNUNET_i2s (CustomPeerMap_get_peer_by_index (mss->pull_map, i))); 3937 GNUNET_i2s (CustomPeerMap_get_peer_by_index (ss->pull_map, i)));
3807 insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (mss->pull_map, i)); 3938 insert_in_sampler (ss, CustomPeerMap_get_peer_by_index (ss->pull_map, i));
3808 /* This cleans only if it is not in the view */ 3939 /* This cleans only if it is not in the view */
3809 clean_peer (CustomPeerMap_get_peer_by_index (mss->pull_map, i)); 3940 clean_peer (ss, CustomPeerMap_get_peer_by_index (ss->pull_map, i));
3810 } 3941 }
3811 3942
3812 3943
3813 /* Empty push/pull lists */ 3944 /* Empty push/pull lists */
3814 CustomPeerMap_clear (mss->push_map); 3945 CustomPeerMap_clear (ss->push_map);
3815 CustomPeerMap_clear (mss->pull_map); 3946 CustomPeerMap_clear (ss->pull_map);
3816 3947
3817 GNUNET_STATISTICS_set (stats, 3948 GNUNET_STATISTICS_set (stats,
3818 "view size", 3949 "view size",
3819 View_size(mss->view), 3950 View_size(ss->view),
3820 GNUNET_NO); 3951 GNUNET_NO);
3821 3952
3822 struct GNUNET_TIME_Relative time_next_round; 3953 struct GNUNET_TIME_Relative time_next_round;
3823 3954
3824 time_next_round = compute_rand_delay (mss->round_interval, 2); 3955 time_next_round = compute_rand_delay (ss->round_interval, 2);
3825 3956
3826 /* Schedule next round */ 3957 /* Schedule next round */
3827 mss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, 3958 ss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3828 &do_round, NULL); 3959 &do_round, ss);
3829 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 3960 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3830} 3961}
3831 3962
@@ -3835,6 +3966,15 @@ do_round (void *cls)
3835 * 3966 *
3836 * It is called on every peer(ID) that cadet somehow has contact with. 3967 * It is called on every peer(ID) that cadet somehow has contact with.
3837 * We use those to initialise the sampler. 3968 * We use those to initialise the sampler.
3969 *
3970 * implements #GNUNET_CADET_PeersCB
3971 *
3972 * @param cls Closure - SubSampler
3973 * @param peer Peer, or NULL on "EOF".
3974 * @param tunnel Do we have a tunnel towards this peer?
3975 * @param n_paths Number of known paths towards this peer.
3976 * @param best_path How long is the best path?
3977 * (0 = unknown, 1 = ourselves, 2 = neighbor)
3838 */ 3978 */
3839void 3979void
3840init_peer_cb (void *cls, 3980init_peer_cb (void *cls,
@@ -3844,7 +3984,7 @@ init_peer_cb (void *cls,
3844 unsigned int best_path) // "How long is the best path? 3984 unsigned int best_path) // "How long is the best path?
3845 // (0 = unknown, 1 = ourselves, 2 = neighbor)" 3985 // (0 = unknown, 1 = ourselves, 2 = neighbor)"
3846{ 3986{
3847 (void) cls; 3987 struct SubSampler *ss = cls;
3848 (void) tunnel; 3988 (void) tunnel;
3849 (void) n_paths; 3989 (void) n_paths;
3850 (void) best_path; 3990 (void) best_path;
@@ -3854,16 +3994,17 @@ init_peer_cb (void *cls,
3854 LOG (GNUNET_ERROR_TYPE_DEBUG, 3994 LOG (GNUNET_ERROR_TYPE_DEBUG,
3855 "Got peer_id %s from cadet\n", 3995 "Got peer_id %s from cadet\n",
3856 GNUNET_i2s (peer)); 3996 GNUNET_i2s (peer));
3857 got_peer (peer); 3997 got_peer (ss, peer);
3858 } 3998 }
3859} 3999}
3860 4000
4001
3861/** 4002/**
3862 * @brief Iterator function over stored, valid peers. 4003 * @brief Iterator function over stored, valid peers.
3863 * 4004 *
3864 * We initialise the sampler with those. 4005 * We initialise the sampler with those.
3865 * 4006 *
3866 * @param cls the closure 4007 * @param cls Closure - SubSampler
3867 * @param peer the peer id 4008 * @param peer the peer id
3868 * @return #GNUNET_YES if we should continue to 4009 * @return #GNUNET_YES if we should continue to
3869 * iterate, 4010 * iterate,
@@ -3873,14 +4014,14 @@ static int
3873valid_peers_iterator (void *cls, 4014valid_peers_iterator (void *cls,
3874 const struct GNUNET_PeerIdentity *peer) 4015 const struct GNUNET_PeerIdentity *peer)
3875{ 4016{
3876 (void) cls; 4017 struct SubSampler *ss = cls;
3877 4018
3878 if (NULL != peer) 4019 if (NULL != peer)
3879 { 4020 {
3880 LOG (GNUNET_ERROR_TYPE_DEBUG, 4021 LOG (GNUNET_ERROR_TYPE_DEBUG,
3881 "Got stored, valid peer %s\n", 4022 "Got stored, valid peer %s\n",
3882 GNUNET_i2s (peer)); 4023 GNUNET_i2s (peer));
3883 got_peer (peer); 4024 got_peer (ss, peer);
3884 } 4025 }
3885 return GNUNET_YES; 4026 return GNUNET_YES;
3886} 4027}
@@ -3889,7 +4030,7 @@ valid_peers_iterator (void *cls,
3889/** 4030/**
3890 * Iterator over peers from peerinfo. 4031 * Iterator over peers from peerinfo.
3891 * 4032 *
3892 * @param cls closure 4033 * @param cls Closure - SubSampler
3893 * @param peer id of the peer, NULL for last call 4034 * @param peer id of the peer, NULL for last call
3894 * @param hello hello message for the peer (can be NULL) 4035 * @param hello hello message for the peer (can be NULL)
3895 * @param error message 4036 * @param error message
@@ -3900,7 +4041,7 @@ process_peerinfo_peers (void *cls,
3900 const struct GNUNET_HELLO_Message *hello, 4041 const struct GNUNET_HELLO_Message *hello,
3901 const char *err_msg) 4042 const char *err_msg)
3902{ 4043{
3903 (void) cls; 4044 struct SubSampler *ss = cls;
3904 (void) hello; 4045 (void) hello;
3905 (void) err_msg; 4046 (void) err_msg;
3906 4047
@@ -3909,7 +4050,7 @@ process_peerinfo_peers (void *cls,
3909 LOG (GNUNET_ERROR_TYPE_DEBUG, 4050 LOG (GNUNET_ERROR_TYPE_DEBUG,
3910 "Got peer_id %s from peerinfo\n", 4051 "Got peer_id %s from peerinfo\n",
3911 GNUNET_i2s (peer)); 4052 GNUNET_i2s (peer));
3912 got_peer (peer); 4053 got_peer (ss, peer);
3913 } 4054 }
3914} 4055}
3915 4056
@@ -3917,13 +4058,14 @@ process_peerinfo_peers (void *cls,
3917/** 4058/**
3918 * Task run during shutdown. 4059 * Task run during shutdown.
3919 * 4060 *
3920 * @param cls unused 4061 * @param cls Closure - SubSampler containing all datastructures to clean
3921 */ 4062 */
3922static void 4063static void
3923shutdown_task (void *cls) 4064shutdown_task (void *cls)
3924{ 4065{
3925 struct ClientContext *client_ctx; 4066 struct ClientContext *client_ctx;
3926 (void) cls; 4067 (void) cls;
4068 struct SubSampler *ss = cls;
3927 4069
3928 LOG (GNUNET_ERROR_TYPE_DEBUG, 4070 LOG (GNUNET_ERROR_TYPE_DEBUG,
3929 "RPS service is going down\n"); 4071 "RPS service is going down\n");
@@ -3938,39 +4080,39 @@ shutdown_task (void *cls)
3938 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle); 4080 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
3939 GNUNET_PEERINFO_disconnect (peerinfo_handle); 4081 GNUNET_PEERINFO_disconnect (peerinfo_handle);
3940 peerinfo_handle = NULL; 4082 peerinfo_handle = NULL;
3941 if (NULL != mss->do_round_task) 4083 if (NULL != ss->do_round_task)
3942 { 4084 {
3943 GNUNET_SCHEDULER_cancel (mss->do_round_task); 4085 GNUNET_SCHEDULER_cancel (ss->do_round_task);
3944 mss->do_round_task = NULL; 4086 ss->do_round_task = NULL;
3945 } 4087 }
3946 4088
3947 peers_terminate (); 4089 peers_terminate (ss);
3948 4090
3949 GNUNET_NSE_disconnect (nse); 4091 GNUNET_NSE_disconnect (nse);
3950 RPS_sampler_destroy (mss->sampler); 4092 RPS_sampler_destroy (ss->sampler);
3951 GNUNET_CADET_close_port (mss->cadet_port); 4093 GNUNET_CADET_close_port (ss->cadet_port);
3952 GNUNET_CADET_disconnect (mss->cadet_handle); 4094 GNUNET_CADET_disconnect (ss->cadet_handle);
3953 mss->cadet_handle = NULL; 4095 ss->cadet_handle = NULL;
3954 View_destroy (mss->view); 4096 View_destroy (ss->view);
3955 CustomPeerMap_destroy (mss->push_map); 4097 CustomPeerMap_destroy (ss->push_map);
3956 CustomPeerMap_destroy (mss->pull_map); 4098 CustomPeerMap_destroy (ss->pull_map);
3957 if (NULL != stats) 4099 if (NULL != stats)
3958 { 4100 {
3959 GNUNET_STATISTICS_destroy (stats, 4101 GNUNET_STATISTICS_destroy (stats,
3960 GNUNET_NO); 4102 GNUNET_NO);
3961 stats = NULL; 4103 stats = NULL;
3962 } 4104 }
3963#ifdef ENABLE_MALICIOUS 4105#ifdef ENABLE_MALICIOUS
3964 struct AttackedPeer *tmp_att_peer; 4106 struct AttackedPeer *tmp_att_peer;
3965 /* it is ok to free this const during shutdown: */ 4107 /* it is ok to free this const during shutdown: */
3966 GNUNET_free ((char *) mss->file_name_view_log); 4108 GNUNET_free ((char *) ss->file_name_view_log);
3967#ifdef TO_FILE 4109#ifdef TO_FILE
3968 GNUNET_free ((char *) mss->file_name_observed_log); 4110 GNUNET_free ((char *) ss->file_name_observed_log);
3969 GNUNET_CONTAINER_multipeermap_destroy (mss->observed_unique_peers); 4111 GNUNET_CONTAINER_multipeermap_destroy (ss->observed_unique_peers);
3970#endif /* TO_FILE */ 4112#endif /* TO_FILE */
3971 GNUNET_array_grow (mal_peers, 4113 GNUNET_array_grow (mal_peers,
3972 num_mal_peers, 4114 num_mal_peers,
3973 0); 4115 0);
3974 if (NULL != mal_peer_set) 4116 if (NULL != mal_peer_set)
3975 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set); 4117 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
3976 if (NULL != att_peer_set) 4118 if (NULL != att_peer_set)
@@ -3979,8 +4121,8 @@ shutdown_task (void *cls)
3979 { 4121 {
3980 tmp_att_peer = att_peers_head; 4122 tmp_att_peer = att_peers_head;
3981 GNUNET_CONTAINER_DLL_remove (att_peers_head, 4123 GNUNET_CONTAINER_DLL_remove (att_peers_head,
3982 att_peers_tail, 4124 att_peers_tail,
3983 tmp_att_peer); 4125 tmp_att_peer);
3984 GNUNET_free (tmp_att_peer); 4126 GNUNET_free (tmp_att_peer);
3985 } 4127 }
3986#endif /* ENABLE_MALICIOUS */ 4128#endif /* ENABLE_MALICIOUS */
@@ -3990,7 +4132,7 @@ shutdown_task (void *cls)
3990/** 4132/**
3991 * Handle client connecting to the service. 4133 * Handle client connecting to the service.
3992 * 4134 *
3993 * @param cls NULL 4135 * @param cls unused
3994 * @param client the new client 4136 * @param client the new client
3995 * @param mq the message queue of @a client 4137 * @param mq the message queue of @a client
3996 * @return @a client 4138 * @return @a client
@@ -4012,6 +4154,7 @@ client_connect_cb (void *cls,
4012 cli_ctx->view_updates_left = -1; 4154 cli_ctx->view_updates_left = -1;
4013 cli_ctx->stream_update = GNUNET_NO; 4155 cli_ctx->stream_update = GNUNET_NO;
4014 cli_ctx->client = client; 4156 cli_ctx->client = client;
4157 cli_ctx->ss = mss;
4015 GNUNET_CONTAINER_DLL_insert (cli_ctx_head, 4158 GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4016 cli_ctx_tail, 4159 cli_ctx_tail,
4017 cli_ctx); 4160 cli_ctx);
@@ -4068,12 +4211,12 @@ run (void *cls,
4068 (void) service; 4211 (void) service;
4069 4212
4070 GNUNET_log_setup ("rps", 4213 GNUNET_log_setup ("rps",
4071 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), 4214 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4072 NULL); 4215 NULL);
4073 cfg = c; 4216 cfg = c;
4074 /* Get own ID */ 4217 /* Get own ID */
4075 GNUNET_CRYPTO_get_peer_identity (cfg, 4218 GNUNET_CRYPTO_get_peer_identity (cfg,
4076 &own_identity); // TODO check return value 4219 &own_identity); // TODO check return value
4077 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 4220 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4078 "STARTING SERVICE (rps) for peer [%s]\n", 4221 "STARTING SERVICE (rps) for peer [%s]\n",
4079 GNUNET_i2s (&own_identity)); 4222 GNUNET_i2s (&own_identity));
@@ -4085,9 +4228,9 @@ run (void *cls,
4085 /* Get time interval from the configuration */ 4228 /* Get time interval from the configuration */
4086 if (GNUNET_OK != 4229 if (GNUNET_OK !=
4087 GNUNET_CONFIGURATION_get_value_time (cfg, 4230 GNUNET_CONFIGURATION_get_value_time (cfg,
4088 "RPS", 4231 "RPS",
4089 "ROUNDINTERVAL", 4232 "ROUNDINTERVAL",
4090 &round_interval)) 4233 &round_interval))
4091 { 4234 {
4092 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 4235 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4093 "RPS", "ROUNDINTERVAL"); 4236 "RPS", "ROUNDINTERVAL");
@@ -4115,14 +4258,10 @@ run (void *cls,
4115 &fn_valid_peers)) 4258 &fn_valid_peers))
4116 { 4259 {
4117 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 4260 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4118 "rps", "FILENAME_VALID_PEERS"); 4261 "rps",
4262 "FILENAME_VALID_PEERS");
4119 } 4263 }
4120 4264
4121
4122 /* connect to NSE */
4123 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4124
4125
4126 alpha = 0.45; 4265 alpha = 0.45;
4127 beta = 0.45; 4266 beta = 0.45;
4128 4267
@@ -4135,27 +4274,29 @@ run (void *cls,
4135 4274
4136 peerinfo_handle = GNUNET_PEERINFO_connect (cfg); 4275 peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4137 4276
4277 /* connect to NSE */
4278 nse = GNUNET_NSE_connect (cfg, nse_callback, mss);
4279
4138 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); 4280 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4139 //GNUNET_CADET_get_peers (mss.cadet_handle, &init_peer_cb, NULL); 4281 //GNUNET_CADET_get_peers (mss.cadet_handle, &init_peer_cb, mss);
4140 // TODO send push/pull to each of those peers? 4282 // TODO send push/pull to each of those peers?
4141 // TODO read stored valid peers from last run 4283 // TODO read stored valid peers from last run
4142 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n"); 4284 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4143 restore_valid_peers (); 4285 restore_valid_peers (mss);
4144 get_valid_peers (valid_peers_iterator, NULL); 4286 get_valid_peers (mss->valid_peers, valid_peers_iterator, mss);
4145 4287
4146 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg, 4288 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4147 GNUNET_NO, 4289 GNUNET_NO,
4148 process_peerinfo_peers, 4290 process_peerinfo_peers,
4149 NULL); 4291 mss);
4150 4292
4151 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n"); 4293 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4152 4294
4153 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); 4295 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, mss);
4154 LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n"); 4296 LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4155 4297
4156 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); 4298 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, mss);
4157 stats = GNUNET_STATISTICS_create ("rps", cfg); 4299 stats = GNUNET_STATISTICS_create ("rps", cfg);
4158
4159} 4300}
4160 4301
4161 4302