From b3aad5bef2e78487251ef7fc766a510f9fc731c9 Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Mon, 1 Oct 2018 20:03:54 +0200 Subject: Change architecture of service (towards subsampling) --- src/rps/gnunet-service-rps.c | 1114 ++++++++++++++++++++++-------------------- 1 file changed, 579 insertions(+), 535 deletions(-) (limited to 'src') diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 389c6139b..3d9f9234c 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -51,30 +51,6 @@ // hist_size_init, hist_size_max -/** - * Our configuration. - */ -static const struct GNUNET_CONFIGURATION_Handle *cfg; - -/** - * Handle to the statistics service. - */ -struct GNUNET_STATISTICS_Handle *stats; - -/** - * Our own identity. - */ -static struct GNUNET_PeerIdentity own_identity; - -static int in_shutdown = GNUNET_NO; - -/** - * @brief Port used for cadet. - * - * Don't compute multiple times through making it global - */ -static struct GNUNET_HashCode port; - /*********************************************************************** * Old gnunet-service-rps_peers.c ***********************************************************************/ @@ -168,7 +144,7 @@ struct ChannelCtx; * This is stored in a multipeermap. * It contains information such as cadet channels, a message queue for sending, * status about the channels, the pending operations on this peer and some flags - * about the status of the peer itself. (online, valid, ...) + * about the status of the peer itself. (live, valid, ...) */ struct PeerContext { @@ -197,7 +173,7 @@ struct PeerContext * * To be canceled on shutdown. */ - struct PendingMessage *online_check_pending; + struct PendingMessage *liveliness_check_pending; /** * Number of pending operations. @@ -276,31 +252,277 @@ struct ChannelCtx struct GNUNET_SCHEDULER_Task *destruction_task; }; + +#ifdef ENABLE_MALICIOUS + /** - * @brief Hashmap of valid peers. + * If type is 2 This struct is used to store the attacked peers in a DLL */ -static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers; +struct AttackedPeer +{ + /** + * DLL + */ + struct AttackedPeer *next; + struct AttackedPeer *prev; + + /** + * PeerID + */ + struct GNUNET_PeerIdentity peer_id; +}; + +#endif /* ENABLE_MALICIOUS */ /** - * @brief Maximum number of valid peers to keep. - * TODO read from config + * @brief One SubSampler. + * + * Essentially one instance of brahms that only connects to other instances + * with the same (secret) value. */ -static uint32_t num_valid_peers_max = UINT32_MAX; +struct SubSampler +{ + /** + * @brief Port used for cadet. + * + * Don't compute multiple times through making it global + */ + struct GNUNET_HashCode port; + + /** + * Handler to CADET. + */ + struct GNUNET_CADET_Handle *cadet_handle; + + /** + * @brief Port to communicate to other peers. + */ + struct GNUNET_CADET_Port *cadet_port; + + /** + * @brief Hashmap of valid peers. + */ + struct GNUNET_CONTAINER_MultiPeerMap *valid_peers; + + /** + * @brief Filename of the file that stores the valid peers persistently. + */ + char *filename_valid_peers; + + /** + * Set of all peers to keep track of them. + */ + struct GNUNET_CONTAINER_MultiPeerMap *peer_map; + + /** + * @brief This is the minimum estimate used as sampler size. + * + * It is configured by the user. + */ + unsigned int sampler_size_est_min; + + /** + * The size of sampler we need to be able to satisfy the Brahms protocol's + * need of random peers. + * + * This is one minimum size the sampler grows to. + */ + unsigned int sampler_size_est_need; + + /** + * Time inverval the do_round task runs in. + */ + struct GNUNET_TIME_Relative round_interval; + + /** + * Sampler used for the Brahms protocol itself. + */ + struct RPS_Sampler *sampler; + + /** + * Name to log view to + */ + char *file_name_view_log; + +#ifdef TO_FILE + /** + * Name to log number of observed peers to + */ + char *file_name_observed_log; + + /** + * @brief Count the observed peers + */ + uint32_t num_observed_peers; + + /** + * @brief Multipeermap (ab-) used to count unique peer_ids + */ + struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers; +#endif /* TO_FILE */ + + /** + * List to store peers received through pushes temporary. + */ + struct CustomPeerMap *push_map; + + /** + * List to store peers received through pulls temporary. + */ + struct CustomPeerMap *pull_map; + + /** + * @brief This is the estimate used as view size. + * + * It is initialised with the minimum + */ + unsigned int view_size_est_need; + + /** + * @brief This is the minimum estimate used as view size. + * + * It is configured by the user. + */ + unsigned int view_size_est_min; + + /** + * Identifier for the main task that runs periodically. + */ + struct GNUNET_SCHEDULER_Task *do_round_task; +}; + + +/*********************************************************************** + * Globals +***********************************************************************/ /** - * @brief Filename of the file that stores the valid peers persistently. + * Our configuration. */ -static char *filename_valid_peers; +static const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Set of all peers to keep track of them. + * Handle to the statistics service. */ -static struct GNUNET_CONTAINER_MultiPeerMap *peer_map; +struct GNUNET_STATISTICS_Handle *stats; /** - * Cadet handle. + * Our own identity. */ -static struct GNUNET_CADET_Handle *cadet_handle; +static struct GNUNET_PeerIdentity own_identity; + +/** + * Percentage of total peer number in the view + * to send random PUSHes to + */ +static float alpha; + +/** + * Percentage of total peer number in the view + * to send random PULLs to + */ +static float beta; + +/** + * Handler to NSE. + */ +static struct GNUNET_NSE_Handle *nse; + +/** + * Handler to PEERINFO. + */ +static struct GNUNET_PEERINFO_Handle *peerinfo_handle; + +/** + * Handle for cancellation of iteration over peers. + */ +static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle; + + +#ifdef ENABLE_MALICIOUS +/** + * Type of malicious peer + * + * 0 Don't act malicious at all - Default + * 1 Try to maximise representation + * 2 Try to partition the network + * 3 Combined attack + */ +static uint32_t mal_type; + +/** + * Other malicious peers + */ +static struct GNUNET_PeerIdentity *mal_peers; + +/** + * Hashmap of malicious peers used as set. + * Used to more efficiently check whether we know that peer. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set; + +/** + * Number of other malicious peers + */ +static uint32_t num_mal_peers; + + +/** + * If type is 2 this is the DLL of attacked peers + */ +static struct AttackedPeer *att_peers_head; +static struct AttackedPeer *att_peers_tail; + +/** + * This index is used to point to an attacked peer to + * implement the round-robin-ish way to select attacked peers. + */ +static struct AttackedPeer *att_peer_index; + +/** + * Hashmap of attacked peers used as set. + * Used to more efficiently check whether we know that peer. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set; + +/** + * Number of attacked peers + */ +static uint32_t num_attacked_peers; + +/** + * If type is 1 this is the attacked peer + */ +static struct GNUNET_PeerIdentity attacked_peer; + +/** + * The limit of PUSHes we can send in one round. + * This is an assumption of the Brahms protocol and either implemented + * via proof of work + * or + * assumend to be the bandwidth limitation. + */ +static uint32_t push_limit = 10000; +#endif /* ENABLE_MALICIOUS */ + +/** + * @brief Main SubSampler. + * + * This is run in any case by all peers and connects to all peers without + * specifying a shared value. + */ +static struct SubSampler *mss; + +/** + * @brief Maximum number of valid peers to keep. + * TODO read from config + */ +static const uint32_t num_valid_peers_max = UINT32_MAX; + + +/*********************************************************************** + * /Globals +***********************************************************************/ /** @@ -316,9 +538,9 @@ get_peer_ctx (const struct GNUNET_PeerIdentity *peer) struct PeerContext *ctx; int ret; - ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer); + ret = GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer); GNUNET_assert (GNUNET_YES == ret); - ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer); + ctx = GNUNET_CONTAINER_multipeermap_get (mss->peer_map, peer); GNUNET_assert (NULL != ctx); return ctx; } @@ -336,9 +558,9 @@ get_peer_ctx (const struct GNUNET_PeerIdentity *peer) static int check_peer_known (const struct GNUNET_PeerIdentity *peer) { - if (NULL != peer_map) + if (NULL != mss->peer_map) { - return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer); + return GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer); } else { return GNUNET_NO; @@ -363,12 +585,12 @@ create_peer_ctx (const struct GNUNET_PeerIdentity *peer) ctx = GNUNET_new (struct PeerContext); ctx->peer_id = *peer; - ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, + ret = GNUNET_CONTAINER_multipeermap_put (mss->peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); GNUNET_assert (GNUNET_OK == ret); GNUNET_STATISTICS_set (stats, "# known peers", - GNUNET_CONTAINER_multipeermap_size (peer_map), + GNUNET_CONTAINER_multipeermap_size (mss->peer_map), GNUNET_NO); return ctx; } @@ -496,7 +718,7 @@ get_random_peer_from_peermap (const struct iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls); iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, GNUNET_CONTAINER_multipeermap_size (peer_map)); - (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers, + (void) GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, get_rand_peer_iterator, iterator_cls); ret = iterator_cls->peer; @@ -522,17 +744,18 @@ add_valid_peer (const struct GNUNET_PeerIdentity *peer) int ret; ret = GNUNET_YES; - while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max) + while (GNUNET_CONTAINER_multipeermap_size ( + mss->valid_peers) >= num_valid_peers_max) { - rand_peer = get_random_peer_from_peermap (valid_peers); - GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer); + rand_peer = get_random_peer_from_peermap (mss->valid_peers); + GNUNET_CONTAINER_multipeermap_remove_all (mss->valid_peers, rand_peer); ret = GNUNET_NO; } - (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL, + (void) GNUNET_CONTAINER_multipeermap_put (mss->valid_peers, peer, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); GNUNET_STATISTICS_set (stats, "# valid peers", - GNUNET_CONTAINER_multipeermap_size (valid_peers), + GNUNET_CONTAINER_multipeermap_size (mss->valid_peers), GNUNET_NO); return ret; } @@ -700,10 +923,10 @@ get_channel (const struct GNUNET_PeerIdentity *peer) *ctx_peer = *peer; peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx); peer_ctx->send_channel_ctx->channel = - GNUNET_CADET_channel_create (cadet_handle, + GNUNET_CADET_channel_create (mss->cadet_handle, peer_ctx->send_channel_ctx, /* context */ peer, - &port, + &mss->port, GNUNET_CADET_OPTION_RELIABLE, NULL, /* WindowSize handler */ &cleanup_destroyed_channel, /* Disconnect handler */ @@ -794,51 +1017,51 @@ remove_pending_message (struct PendingMessage *pending_msg, int cancel) /** * @brief This is called in response to the first message we sent as a - * online check. + * liveliness check. * - * @param cls #PeerContext of peer with pending online check + * @param cls #PeerContext of peer with pending liveliness check */ static void -mq_online_check_successful (void *cls) +mq_liveliness_check_successful (void *cls) { struct PeerContext *peer_ctx = cls; - if (NULL != peer_ctx->online_check_pending) + if (NULL != peer_ctx->liveliness_check_pending) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Online check for peer %s was successfull\n", + "Liveliness check for peer %s was successfull\n", GNUNET_i2s (&peer_ctx->peer_id)); - remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES); - peer_ctx->online_check_pending = NULL; - set_peer_online (peer_ctx); + remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES); + peer_ctx->liveliness_check_pending = NULL; + set_peer_live (peer_ctx); } } /** - * Issue a check whether peer is online + * Issue a check whether peer is live * * @param peer_ctx the context of the peer */ static void -check_peer_online (struct PeerContext *peer_ctx) +check_peer_live (struct PeerContext *peer_ctx) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Get informed about peer %s getting online\n", + "Get informed about peer %s getting live\n", GNUNET_i2s (&peer_ctx->peer_id)); struct GNUNET_MQ_Handle *mq; struct GNUNET_MQ_Envelope *ev; ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE); - peer_ctx->online_check_pending = - insert_pending_message (&peer_ctx->peer_id, ev, "Check online"); + peer_ctx->liveliness_check_pending = + insert_pending_message (&peer_ctx->peer_id, ev, "Check liveliness"); mq = get_mq (&peer_ctx->peer_id); GNUNET_MQ_notify_sent (ev, - mq_online_check_successful, + mq_liveliness_check_successful, peer_ctx); GNUNET_MQ_send (mq, ev); GNUNET_STATISTICS_update (stats, - "# pending online checks", + "# pending liveliness checks", 1, GNUNET_NO); } @@ -945,9 +1168,9 @@ static int destroy_peer (struct PeerContext *peer_ctx) { GNUNET_assert (NULL != peer_ctx); - GNUNET_assert (NULL != peer_map); + GNUNET_assert (NULL != mss->peer_map); if (GNUNET_NO == - GNUNET_CONTAINER_multipeermap_contains (peer_map, + GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, &peer_ctx->peer_id)) { return GNUNET_NO; @@ -971,14 +1194,14 @@ destroy_peer (struct PeerContext *peer_ctx) "Removing unsent %s\n", peer_ctx->pending_messages_head->type); /* Cancle pending message, too */ - if ( (NULL != peer_ctx->online_check_pending) && + if ( (NULL != peer_ctx->liveliness_check_pending) && (0 == memcmp (peer_ctx->pending_messages_head, - peer_ctx->online_check_pending, + peer_ctx->liveliness_check_pending, sizeof (struct PendingMessage))) ) { - peer_ctx->online_check_pending = NULL; + peer_ctx->liveliness_check_pending = NULL; GNUNET_STATISTICS_update (stats, - "# pending online checks", + "# pending liveliness checks", -1, GNUNET_NO); } @@ -986,18 +1209,18 @@ destroy_peer (struct PeerContext *peer_ctx) GNUNET_YES); } - /* If we are still waiting for notification whether this peer is online + /* If we are still waiting for notification whether this peer is live * cancel the according task */ - if (NULL != peer_ctx->online_check_pending) + if (NULL != peer_ctx->liveliness_check_pending) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Removing pending online check for peer %s\n", + "Removing pending liveliness check for peer %s\n", GNUNET_i2s (&peer_ctx->peer_id)); // TODO wait until cadet sets mq->cancel_impl - //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev); - remove_pending_message (peer_ctx->online_check_pending, + //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev); + remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES); - peer_ctx->online_check_pending = NULL; + peer_ctx->liveliness_check_pending = NULL; } if (NULL != peer_ctx->send_channel_ctx) @@ -1017,15 +1240,15 @@ destroy_peer (struct PeerContext *peer_ctx) } if (GNUNET_YES != - GNUNET_CONTAINER_multipeermap_remove_all (peer_map, + GNUNET_CONTAINER_multipeermap_remove_all (mss->peer_map, &peer_ctx->peer_id)) { LOG (GNUNET_ERROR_TYPE_WARNING, - "removing peer from peer_map failed\n"); + "removing peer from mss->peer_map failed\n"); } GNUNET_STATISTICS_set (stats, "# known peers", - GNUNET_CONTAINER_multipeermap_size (peer_map), + GNUNET_CONTAINER_multipeermap_size (mss->peer_map), GNUNET_NO); GNUNET_free (peer_ctx); return GNUNET_YES; @@ -1130,27 +1353,27 @@ store_valid_peers () uint32_t number_written_peers; int ret; - if (0 == strncmp ("DISABLE", filename_valid_peers, 7)) + if (0 == strncmp ("DISABLE", mss->filename_valid_peers, 7)) { return; } - ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers); + ret = GNUNET_DISK_directory_create_for_file (mss->filename_valid_peers); if (GNUNET_SYSERR == ret) { LOG (GNUNET_ERROR_TYPE_WARNING, "Not able to create directory for file `%s'\n", - filename_valid_peers); + mss->filename_valid_peers); GNUNET_break (0); } else if (GNUNET_NO == ret) { LOG (GNUNET_ERROR_TYPE_WARNING, "Directory for file `%s' exists but is not writable for us\n", - filename_valid_peers); + mss->filename_valid_peers); GNUNET_break (0); } - fh = GNUNET_DISK_file_open (filename_valid_peers, + fh = GNUNET_DISK_file_open (mss->filename_valid_peers, GNUNET_DISK_OPEN_WRITE | GNUNET_DISK_OPEN_CREATE, GNUNET_DISK_PERM_USER_READ | @@ -1159,19 +1382,19 @@ store_valid_peers () { LOG (GNUNET_ERROR_TYPE_WARNING, "Not able to write valid peers to file `%s'\n", - filename_valid_peers); + mss->filename_valid_peers); return; } LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing %u valid peers to disk\n", - GNUNET_CONTAINER_multipeermap_size (valid_peers)); + GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); number_written_peers = - GNUNET_CONTAINER_multipeermap_iterate (valid_peers, + GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, store_peer_presistently_iterator, fh); GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); GNUNET_assert (number_written_peers == - GNUNET_CONTAINER_multipeermap_size (valid_peers)); + GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); } @@ -1236,16 +1459,16 @@ restore_valid_peers () char *str_repr; const struct GNUNET_PeerIdentity *peer; - if (0 == strncmp ("DISABLE", filename_valid_peers, 7)) + if (0 == strncmp ("DISABLE", mss->filename_valid_peers, 7)) { return; } - if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers)) + if (GNUNET_OK != GNUNET_DISK_file_test (mss->filename_valid_peers)) { return; } - fh = GNUNET_DISK_file_open (filename_valid_peers, + fh = GNUNET_DISK_file_open (mss->filename_valid_peers, GNUNET_DISK_OPEN_READ, GNUNET_DISK_PERM_NONE); GNUNET_assert (NULL != fh); @@ -1257,7 +1480,7 @@ restore_valid_peers () LOG (GNUNET_ERROR_TYPE_DEBUG, "Restoring %" PRIu32 " peers from file `%s'\n", num_peers, - filename_valid_peers); + mss->filename_valid_peers); for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53) { str_repr = GNUNET_strndup (iter_buf, 53); @@ -1271,10 +1494,10 @@ restore_valid_peers () iter_buf = NULL; GNUNET_free (buf); LOG (GNUNET_ERROR_TYPE_DEBUG, - "num_peers: %" PRIu32 ", _size (valid_peers): %u\n", + "num_peers: %" PRIu32 ", _size (mss->valid_peers): %u\n", num_peers, - GNUNET_CONTAINER_multipeermap_size (valid_peers)); - if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers)) + GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); + if (num_peers != GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Number of restored peers does not match file size. Have probably duplicates.\n"); @@ -1282,26 +1505,7 @@ restore_valid_peers () GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); LOG (GNUNET_ERROR_TYPE_DEBUG, "Restored %u valid peers from disk\n", - GNUNET_CONTAINER_multipeermap_size (valid_peers)); -} - - -/** - * @brief Initialise storage of peers - * - * @param fn_valid_peers filename of the file used to store valid peer ids - * @param cadet_h cadet handle - * @param own_id own peer identity - */ -static void -initialise_peers (char* fn_valid_peers, - struct GNUNET_CADET_Handle *cadet_h) -{ - filename_valid_peers = GNUNET_strdup (fn_valid_peers); - cadet_handle = cadet_h; - peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); - valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); - restore_valid_peers (); + GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); } @@ -1312,20 +1516,20 @@ static void peers_terminate () { if (GNUNET_SYSERR == - GNUNET_CONTAINER_multipeermap_iterate (peer_map, + GNUNET_CONTAINER_multipeermap_iterate (mss->peer_map, &peermap_clear_iterator, NULL)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Iteration destroying peers was aborted.\n"); } - GNUNET_CONTAINER_multipeermap_destroy (peer_map); - peer_map = NULL; + GNUNET_CONTAINER_multipeermap_destroy (mss->peer_map); + mss->peer_map = NULL; store_valid_peers (); - GNUNET_free (filename_valid_peers); - filename_valid_peers = NULL; - GNUNET_CONTAINER_multipeermap_destroy (valid_peers); - valid_peers = NULL; + GNUNET_free (mss->filename_valid_peers); + mss->filename_valid_peers = NULL; + GNUNET_CONTAINER_multipeermap_destroy (mss->valid_peers); + mss->valid_peers = NULL; } @@ -1370,7 +1574,7 @@ get_valid_peers (PeersIterator iterator, cls = GNUNET_new (struct PeersIteratorCls); cls->iterator = iterator; cls->cls = it_cls; - ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers, + ret = GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, valid_peer_iterator, cls); GNUNET_free (cls); @@ -1430,21 +1634,21 @@ check_peer_flag (const struct GNUNET_PeerIdentity *peer, * * If not known yet, insert into known peers * - * @param peer the peer whose online is to be checked + * @param peer the peer whose liveliness is to be checked * @return #GNUNET_YES if the check was issued * #GNUNET_NO otherwise */ static int -issue_peer_online_check (const struct GNUNET_PeerIdentity *peer) +issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer) { struct PeerContext *peer_ctx; (void) insert_peer (peer); peer_ctx = get_peer_ctx (peer); if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) && - (NULL == peer_ctx->online_check_pending) ) + (NULL == peer_ctx->liveliness_check_pending) ) { - check_peer_online (peer_ctx); + check_peer_live (peer_ctx); return GNUNET_YES; } return GNUNET_NO; @@ -1469,7 +1673,7 @@ check_removable (const struct GNUNET_PeerIdentity *peer) { struct PeerContext *peer_ctx; - if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) + if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer)) { return GNUNET_SYSERR; } @@ -1498,7 +1702,7 @@ check_removable (const struct GNUNET_PeerIdentity *peer) static int check_peer_valid (const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer); + return GNUNET_CONTAINER_multipeermap_contains (mss->valid_peers, peer); } @@ -1566,7 +1770,7 @@ handle_inbound_channel (void *cls, GNUNET_assert (NULL != channel); /* according to cadet API */ /* Make sure we 'know' about this peer */ peer_ctx = create_or_get_peer_ctx (initiator); - set_peer_online (peer_ctx); + set_peer_live (peer_ctx); ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity); *ctx_peer = *initiator; channel_ctx = add_channel_ctx (peer_ctx); @@ -1675,8 +1879,7 @@ send_message (const struct GNUNET_PeerIdentity *peer, * * Avoids scheduling an operation twice. * - * @param peer the peer we want to schedule the operation for once it gets - * online + * @param peer the peer we want to schedule the operation for once it gets live * * @return #GNUNET_YES if the operation was scheduled * #GNUNET_NO otherwise @@ -1690,7 +1893,7 @@ schedule_operation (const struct GNUNET_PeerIdentity *peer, GNUNET_assert (GNUNET_YES == check_peer_known (peer)); - //TODO if ONLINE execute immediately + //TODO if LIVE/ONLINE execute immediately if (GNUNET_NO == check_operation_scheduled (peer, peer_op)) { @@ -1790,213 +1993,6 @@ struct ClientContext *cli_ctx_tail; -/*********************************************************************** - * Globals -***********************************************************************/ - -/** - * Sampler used for the Brahms protocol itself. - */ -static struct RPS_Sampler *prot_sampler; - -/** - * Name to log view to - */ -static const char *file_name_view_log; - -#ifdef TO_FILE -/** - * Name to log number of observed peers to - */ -static const char *file_name_observed_log; - -/** - * @brief Count the observed peers - */ -static uint32_t num_observed_peers; - -/** - * @brief Multipeermap (ab-) used to count unique peer_ids - */ -static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers; -#endif /* TO_FILE */ - -/** - * The size of sampler we need to be able to satisfy the Brahms protocol's - * need of random peers. - * - * This is one minimum size the sampler grows to. - */ -static unsigned int sampler_size_est_need; - -/** - * @brief This is the minimum estimate used as sampler size. - * - * It is configured by the user. - */ -static unsigned int sampler_size_est_min; - -/** - * @brief This is the estimate used as view size. - * - * It is initialised with the minimum - */ -static unsigned int view_size_est_need; - -/** - * @brief This is the minimum estimate used as view size. - * - * It is configured by the user. - */ -static unsigned int view_size_est_min; - -/** - * Percentage of total peer number in the view - * to send random PUSHes to - */ -static float alpha; - -/** - * Percentage of total peer number in the view - * to send random PULLs to - */ -static float beta; - -/** - * Identifier for the main task that runs periodically. - */ -static struct GNUNET_SCHEDULER_Task *do_round_task; - -/** - * Time inverval the do_round task runs in. - */ -static struct GNUNET_TIME_Relative round_interval; - -/** - * List to store peers received through pushes temporary. - */ -static struct CustomPeerMap *push_map; - -/** - * List to store peers received through pulls temporary. - */ -static struct CustomPeerMap *pull_map; - -/** - * Handler to NSE. - */ -static struct GNUNET_NSE_Handle *nse; - -/** - * Handler to CADET. - */ -static struct GNUNET_CADET_Handle *cadet_handle; - -/** - * @brief Port to communicate to other peers. - */ -static struct GNUNET_CADET_Port *cadet_port; - -/** - * Handler to PEERINFO. - */ -static struct GNUNET_PEERINFO_Handle *peerinfo_handle; - -/** - * Handle for cancellation of iteration over peers. - */ -static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle; - - -#ifdef ENABLE_MALICIOUS -/** - * Type of malicious peer - * - * 0 Don't act malicious at all - Default - * 1 Try to maximise representation - * 2 Try to partition the network - * 3 Combined attack - */ -static uint32_t mal_type; - -/** - * Other malicious peers - */ -static struct GNUNET_PeerIdentity *mal_peers; - -/** - * Hashmap of malicious peers used as set. - * Used to more efficiently check whether we know that peer. - */ -static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set; - -/** - * Number of other malicious peers - */ -static uint32_t num_mal_peers; - - -/** - * If type is 2 This struct is used to store the attacked peers in a DLL - */ -struct AttackedPeer -{ - /** - * DLL - */ - struct AttackedPeer *next; - struct AttackedPeer *prev; - - /** - * PeerID - */ - struct GNUNET_PeerIdentity peer_id; -}; - -/** - * If type is 2 this is the DLL of attacked peers - */ -static struct AttackedPeer *att_peers_head; -static struct AttackedPeer *att_peers_tail; - -/** - * This index is used to point to an attacked peer to - * implement the round-robin-ish way to select attacked peers. - */ -static struct AttackedPeer *att_peer_index; - -/** - * Hashmap of attacked peers used as set. - * Used to more efficiently check whether we know that peer. - */ -static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set; - -/** - * Number of attacked peers - */ -static uint32_t num_attacked_peers; - -/** - * If type is 1 this is the attacked peer - */ -static struct GNUNET_PeerIdentity attacked_peer; - -/** - * The limit of PUSHes we can send in one round. - * This is an assumption of the Brahms protocol and either implemented - * via proof of work - * or - * assumend to be the bandwidth limitation. - */ -static uint32_t push_limit = 10000; -#endif /* ENABLE_MALICIOUS */ - - -/*********************************************************************** - * /Globals -***********************************************************************/ - - /*********************************************************************** * Util functions ***********************************************************************/ @@ -2062,7 +2058,7 @@ rem_from_list (struct GNUNET_PeerIdentity **peer_list, /** * Insert PeerID in #view * - * Called once we know a peer is online. + * Called once we know a peer is live. * Implements #PeerOp * * @return GNUNET_OK if peer was actually inserted @@ -2075,7 +2071,7 @@ insert_in_view_op (void *cls, /** * Insert PeerID in #view * - * Called once we know a peer is online. + * Called once we know a peer is live. * * @return GNUNET_OK if peer was actually inserted * GNUNET_NO if peer was not inserted @@ -2090,7 +2086,7 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer) if ( (GNUNET_NO == online) || (GNUNET_SYSERR == online) ) /* peer is not even known */ { - (void) issue_peer_online_check (peer); + (void) issue_peer_liveliness_check (peer); (void) schedule_operation (peer, insert_in_view_op); return GNUNET_NO; } @@ -2254,7 +2250,7 @@ hist_update (const struct GNUNET_PeerIdentity *ids, { clients_notify_stream_peer (1, &ids[i]); } - to_file (file_name_view_log, + to_file (mss->file_name_view_log, "+%s\t(hist)", GNUNET_i2s_full (ids)); } @@ -2377,7 +2373,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, /** * Insert PeerID in #pull_map * - * Called once we know a peer is online. + * Called once we know a peer is live. */ static void insert_in_pull_map (void *cls, @@ -2391,7 +2387,7 @@ insert_in_pull_map (void *cls, /** * Insert PeerID in #view * - * Called once we know a peer is online. + * Called once we know a peer is live. * Implements #PeerOp */ static void @@ -2421,35 +2417,35 @@ insert_in_sampler (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating samplers with peer %s from insert_in_sampler()\n", GNUNET_i2s (peer)); - RPS_sampler_update (prot_sampler, peer); - if (0 < RPS_sampler_count_id (prot_sampler, peer)) + RPS_sampler_update (mss->sampler, peer); + if (0 < RPS_sampler_count_id (mss->sampler, peer)) { /* Make sure we 'know' about this peer */ - (void) issue_peer_online_check (peer); + (void) issue_peer_liveliness_check (peer); /* Establish a channel towards that peer to indicate we are going to send * messages to it */ //indicate_sending_intention (peer); } #ifdef TO_FILE - num_observed_peers++; + mss->num_observed_peers++; GNUNET_CONTAINER_multipeermap_put - (observed_unique_peers, + (mss->observed_unique_peers, peer, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size ( - observed_unique_peers); - to_file (file_name_observed_log, + uint32_t num_observed_unique_peers = + GNUNET_CONTAINER_multipeermap_size (mss->observed_unique_peers); + to_file (mss->file_name_observed_log, "%" PRIu32 " %" PRIu32 " %f\n", - num_observed_peers, + mss->num_observed_peers, num_observed_unique_peers, - 1.0*num_observed_unique_peers/num_observed_peers) + 1.0*num_observed_unique_peers/mss->num_observed_peers) #endif /* TO_FILE */ } /** * @brief This is called on peers from external sources (cadet, peerinfo, ...) - * If the peer is not known, online check is issued and it is + * If the peer is not known, liveliness check is issued and it is * scheduled to be inserted in sampler and view. * * "External sources" refer to every source except the gossip. @@ -2460,7 +2456,7 @@ static void got_peer (const struct GNUNET_PeerIdentity *peer) { /* If we did not know this peer already, insert it into sampler and view */ - if (GNUNET_YES == issue_peer_online_check (peer)) + if (GNUNET_YES == issue_peer_liveliness_check (peer)) { schedule_operation (peer, insert_in_sampler); schedule_operation (peer, insert_in_view_op); @@ -2488,10 +2484,10 @@ check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer) } if (GNUNET_YES == check_sending_channel_exists (peer)) { - if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) || + if ( (0 < RPS_sampler_count_id (mss->sampler, peer)) || (GNUNET_YES == View_contains_peer (peer)) || - (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) || - (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) || + (GNUNET_YES == CustomPeerMap_contains_peer (mss->push_map, peer)) || + (GNUNET_YES == CustomPeerMap_contains_peer (mss->pull_map, peer)) || (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING))) { /* If we want to keep the connection to peer open */ return GNUNET_YES; @@ -2511,9 +2507,9 @@ static void remove_peer (const struct GNUNET_PeerIdentity *peer) { (void) View_remove_peer (peer); - CustomPeerMap_remove_peer (pull_map, peer); - CustomPeerMap_remove_peer (push_map, peer); - RPS_sampler_reinitialise_by_value (prot_sampler, peer); + CustomPeerMap_remove_peer (mss->pull_map, peer); + CustomPeerMap_remove_peer (mss->push_map, peer); + RPS_sampler_reinitialise_by_value (mss->sampler, peer); destroy_peer (get_peer_ctx (peer)); } @@ -2543,9 +2539,9 @@ clean_peer (const struct GNUNET_PeerIdentity *peer) if ( (GNUNET_NO == check_peer_send_intention (peer)) && (GNUNET_NO == View_contains_peer (peer)) && - (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && - (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && - (0 == RPS_sampler_count_id (prot_sampler, peer)) && + (GNUNET_NO == CustomPeerMap_contains_peer (mss->push_map, peer)) && + (GNUNET_NO == CustomPeerMap_contains_peer (mss->push_map, peer)) && + (0 == RPS_sampler_count_id (mss->sampler, peer)) && (GNUNET_NO != check_removable (peer)) ) { /* We can safely remove this peer */ LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -2591,6 +2587,116 @@ cleanup_destroyed_channel (void *cls, * /Util functions ***********************************************************************/ + + +/*********************************************************************** + * SubSampler +***********************************************************************/ + +struct SubSampler * +new_subsampler (const char *shared_value, + uint32_t sampler_size, + struct GNUNET_TIME_Relative round_interval) +{ + struct SubSampler *ss; + char hash_port_string[512] = GNUNET_APPLICATION_PORT_RPS; + + ss = GNUNET_new (struct SubSampler); + + /* With the hash generated from the secret value this service only connects + * to rps instances that share the value */ + strcat (hash_port_string, shared_value); + GNUNET_CRYPTO_hash (hash_port_string, + strlen (hash_port_string), + &ss->port); + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_fixed_size (peer_check, + GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (peer_push, + GNUNET_MESSAGE_TYPE_RPS_PP_PUSH, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (peer_pull_request, + GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (peer_pull_reply, + GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY, + struct GNUNET_RPS_P2P_PullReplyMessage, + NULL), + GNUNET_MQ_handler_end () + }; + ss->cadet_handle = GNUNET_CADET_connect (cfg); + GNUNET_assert (NULL != ss->cadet_handle); + ss->cadet_port = + GNUNET_CADET_open_port (ss->cadet_handle, + &ss->port, + &handle_inbound_channel, /* Connect handler */ + NULL, /* cls */ + NULL, /* WindowSize handler */ + &cleanup_destroyed_channel, /* Disconnect handler */ + cadet_handlers); + if (NULL == ss->cadet_port) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Cadet port `%s' is already in use.\n", + GNUNET_APPLICATION_PORT_RPS); + GNUNET_assert (0); + } + + /* Set up general data structure to keep track about peers */ + ss->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_filename (cfg, + "rps", + "FILENAME_VALID_PEERS", + &ss->filename_valid_peers)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "rps", + "FILENAME_VALID_PEERS"); + } + ss->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); + + /* Set up the sampler */ + ss->sampler_size_est_min = sampler_size; + ss->sampler_size_est_need = sampler_size;; + LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", ss->sampler_size_est_min); + ss->round_interval = round_interval; + ss->sampler = RPS_sampler_init (sampler_size, + round_interval); + + /* Logging of internals */ + ss->file_name_view_log = store_prefix_file_name (&own_identity, "view"); + #ifdef TO_FILE + ss->file_name_observed_log = store_prefix_file_name (&own_identity, + "observed"); + ss->num_observed_peers = 0; + ss->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, + GNUNET_NO); + #endif /* TO_FILE */ + + /* Set up data structures for gossip */ + ss->push_map = CustomPeerMap_create (4); + ss->pull_map = CustomPeerMap_create (4); + ss->view_size_est_min = sampler_size;; + View_create (ss->view_size_est_min); + GNUNET_STATISTICS_set (stats, + "view size aim", + ss->view_size_est_min, + GNUNET_NO); + + + return ss; +} + +/*********************************************************************** + * /SubSampler +***********************************************************************/ + + static void destroy_cli_ctx (struct ClientContext *cli_ctx) { @@ -2620,29 +2726,29 @@ nse_callback (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n", - logestimate, std_dev, RPS_sampler_get_size (prot_sampler)); + logestimate, std_dev, RPS_sampler_get_size (mss->sampler)); //scale = .01; estimate = GNUNET_NSE_log_estimate_to_n (logestimate); // GNUNET_NSE_log_estimate_to_n (logestimate); estimate = pow (estimate, 1.0 / 3); // TODO add if std_dev is a number // estimate += (std_dev * scale); - if (view_size_est_min < ceil (estimate)) + if (mss->view_size_est_min < ceil (estimate)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate); - sampler_size_est_need = estimate; - view_size_est_need = estimate; + mss->sampler_size_est_need = estimate; + mss->view_size_est_need = estimate; } else { LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); - //sampler_size_est_need = view_size_est_min; - view_size_est_need = view_size_est_min; + //mss->sampler_size_est_need = mss->view_size_est_min; + mss->view_size_est_need = mss->view_size_est_min; } - GNUNET_STATISTICS_set (stats, "view size aim", view_size_est_need, GNUNET_NO); + GNUNET_STATISTICS_set (stats, "view size aim", mss->view_size_est_need, GNUNET_NO); /* If the NSE has changed adapt the lists accordingly */ - resize_wrapper (prot_sampler, sampler_size_est_need); - View_change_len (view_size_est_need); + resize_wrapper (mss->sampler, mss->sampler_size_est_need); + View_change_len (mss->view_size_est_need); } @@ -2828,7 +2934,7 @@ handle_peer_check (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer)); GNUNET_STATISTICS_update (stats, - "# pending online checks", + "# pending liveliness checks", -1, GNUNET_NO); @@ -2892,7 +2998,7 @@ handle_peer_push (void *cls, #endif /* ENABLE_MALICIOUS */ /* Add the sending peer to the push_map */ - CustomPeerMap_put (push_map, peer); + CustomPeerMap_put (mss->push_map, peer); GNUNET_break_op (check_peer_known (peer)); GNUNET_CADET_receive_done (channel_ctx->channel); @@ -3057,12 +3163,12 @@ handle_peer_pull_reply (void *cls, if (GNUNET_YES == check_peer_valid (&peers[i])) { - CustomPeerMap_put (pull_map, &peers[i]); + CustomPeerMap_put (mss->pull_map, &peers[i]); } else { schedule_operation (&peers[i], insert_in_pull_map); - (void) issue_peer_online_check (&peers[i]); + (void) issue_peer_liveliness_check (&peers[i]); } } @@ -3253,8 +3359,8 @@ handle_client_act_malicious (void *cls, mal_peer_set); /* Substitute do_round () with do_mal_round () */ - GNUNET_SCHEDULER_cancel (do_round_task); - do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); + GNUNET_SCHEDULER_cancel (mss->do_round_task); + mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); } else if ( (2 == mal_type) || @@ -3288,7 +3394,7 @@ handle_client_act_malicious (void *cls, /* Set the flag of the attacked peer to valid to avoid problems */ if (GNUNET_NO == check_peer_known (&attacked_peer)) { - (void) issue_peer_online_check (&attacked_peer); + (void) issue_peer_liveliness_check (&attacked_peer); } LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -3296,16 +3402,16 @@ handle_client_act_malicious (void *cls, GNUNET_i2s (&attacked_peer)); /* Substitute do_round () with do_mal_round () */ - GNUNET_SCHEDULER_cancel (do_round_task); - do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); + GNUNET_SCHEDULER_cancel (mss->do_round_task); + mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); } else if (0 == mal_type) { /* Stop acting malicious */ GNUNET_array_grow (mal_peers, num_mal_peers, 0); /* Substitute do_mal_round () with do_round () */ - GNUNET_SCHEDULER_cancel (do_round_task); - do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); + GNUNET_SCHEDULER_cancel (mss->do_round_task); + mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); } else { @@ -3333,7 +3439,7 @@ do_mal_round (void *cls) LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round maliciously type %" PRIu32 ".\n", mal_type); - do_round_task = NULL; + mss->do_round_task = NULL; GNUNET_assert (mal_type <= 3); /* Do malicious actions */ if (1 == mal_type) @@ -3379,7 +3485,7 @@ do_mal_round (void *cls) * Send as many pushes to the attacked peer as possible * That is one push per round as it will ignore more. */ - (void) issue_peer_online_check (&attacked_peer); + (void) issue_peer_liveliness_check (&attacked_peer); if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE)) send_push (&attacked_peer); } @@ -3391,7 +3497,7 @@ do_mal_round (void *cls) /* Send PUSH to attacked peers */ if (GNUNET_YES == check_peer_known (&attacked_peer)) { - (void) issue_peer_online_check (&attacked_peer); + (void) issue_peer_liveliness_check (&attacked_peer); if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE)) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -3400,7 +3506,7 @@ do_mal_round (void *cls) send_push (&attacked_peer); } } - (void) issue_peer_online_check (&attacked_peer); + (void) issue_peer_liveliness_check (&attacked_peer); /* The maximum of pushes we're going to send this round */ num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1, @@ -3435,13 +3541,13 @@ do_mal_round (void *cls) } /* Schedule next round */ - time_next_round = compute_rand_delay (round_interval, 2); + time_next_round = compute_rand_delay (mss->round_interval, 2); - //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round, + //mss->do_round_task = GNUNET_SCHEDULER_add_delayed (mss->round_interval, &do_mal_round, //NULL); - GNUNET_assert (NULL == do_round_task); - do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, - &do_mal_round, NULL); + GNUNET_assert (NULL == mss->do_round_task); + mss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, + &do_mal_round, NULL); LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); } #endif /* ENABLE_MALICIOUS */ @@ -3468,17 +3574,17 @@ do_round (void *cls) LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round.\n"); GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO); - do_round_task = NULL; + mss->do_round_task = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Printing view:\n"); - to_file (file_name_view_log, + to_file (mss->file_name_view_log, "___ new round ___"); view_array = View_get_as_array (); for (i = 0; i < View_size (); i++) { LOG (GNUNET_ERROR_TYPE_DEBUG, "\t%s\n", GNUNET_i2s (&view_array[i])); - to_file (file_name_view_log, + to_file (mss->file_name_view_log, "=%s\t(do round)", GNUNET_i2s_full (&view_array[i])); } @@ -3532,9 +3638,9 @@ do_round (void *cls) /* Update view */ /* TODO see how many peers are in push-/pull- list! */ - if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) && - (0 < CustomPeerMap_size (push_map)) && - (0 < CustomPeerMap_size (pull_map))) + if ((CustomPeerMap_size (mss->push_map) <= alpha * mss->view_size_est_need) && + (0 < CustomPeerMap_size (mss->push_map)) && + (0 < CustomPeerMap_size (mss->pull_map))) //if (GNUNET_YES) // disable blocking temporarily { /* If conditions for update are fulfilled, update */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n"); @@ -3552,16 +3658,16 @@ do_round (void *cls) /* Seems like recreating is the easiest way of emptying the peermap */ View_clear (); - to_file (file_name_view_log, + to_file (mss->file_name_view_log, "--- emptied ---"); - first_border = GNUNET_MIN (ceil (alpha * view_size_est_need), - CustomPeerMap_size (push_map)); + first_border = GNUNET_MIN (ceil (alpha * mss->view_size_est_need), + CustomPeerMap_size (mss->push_map)); second_border = first_border + - GNUNET_MIN (floor (beta * view_size_est_need), - CustomPeerMap_size (pull_map)); + GNUNET_MIN (floor (beta * mss->view_size_est_need), + CustomPeerMap_size (mss->pull_map)); final_size = second_border + - ceil ((1 - (alpha + beta)) * view_size_est_need); + ceil ((1 - (alpha + beta)) * mss->view_size_est_need); LOG (GNUNET_ERROR_TYPE_DEBUG, "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n", first_border, @@ -3570,18 +3676,18 @@ do_round (void *cls) /* Update view with peers received through PUSHes */ permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, - CustomPeerMap_size (push_map)); + CustomPeerMap_size (mss->push_map)); for (i = 0; i < first_border; i++) { int inserted; - inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map, + inserted = insert_in_view (CustomPeerMap_get_peer_by_index (mss->push_map, permut[i])); if (GNUNET_OK == inserted) { clients_notify_stream_peer (1, - CustomPeerMap_get_peer_by_index (push_map, permut[i])); + CustomPeerMap_get_peer_by_index (mss->push_map, permut[i])); } - to_file (file_name_view_log, + to_file (mss->file_name_view_log, "+%s\t(push list)", GNUNET_i2s_full (&view_array[i])); // TODO change the peer_flags accordingly @@ -3591,19 +3697,19 @@ do_round (void *cls) /* Update view with peers received through PULLs */ permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, - CustomPeerMap_size (pull_map)); + CustomPeerMap_size (mss->pull_map)); for (i = first_border; i < second_border; i++) { int inserted; - inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, + inserted = insert_in_view (CustomPeerMap_get_peer_by_index (mss->pull_map, permut[i - first_border])); if (GNUNET_OK == inserted) { clients_notify_stream_peer (1, - CustomPeerMap_get_peer_by_index (pull_map, + CustomPeerMap_get_peer_by_index (mss->pull_map, permut[i - first_border])); } - to_file (file_name_view_log, + to_file (mss->file_name_view_log, "+%s\t(pull list)", GNUNET_i2s_full (&view_array[i])); // TODO change the peer_flags accordingly @@ -3612,7 +3718,7 @@ do_round (void *cls) permut = NULL; /* Update view with peers from history */ - RPS_sampler_get_n_rand_peers (prot_sampler, + RPS_sampler_get_n_rand_peers (mss->sampler, final_size - second_border, hist_update, NULL); @@ -3624,7 +3730,7 @@ do_round (void *cls) /* Clean peers that were removed from the view */ for (i = 0; i < peers_to_clean_size; i++) { - to_file (file_name_view_log, + to_file (mss->file_name_view_log, "-%s", GNUNET_i2s_full (&peers_to_clean[i])); clean_peer (&peers_to_clean[i]); @@ -3635,31 +3741,31 @@ do_round (void *cls) } else { LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n"); GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO); - if (CustomPeerMap_size (push_map) > alpha * View_size () && - !(0 >= CustomPeerMap_size (pull_map))) + if (CustomPeerMap_size (mss->push_map) > alpha * View_size () && + !(0 >= CustomPeerMap_size (mss->pull_map))) GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO); - if (CustomPeerMap_size (push_map) > alpha * View_size () && - (0 >= CustomPeerMap_size (pull_map))) + if (CustomPeerMap_size (mss->push_map) > alpha * View_size () && + (0 >= CustomPeerMap_size (mss->pull_map))) GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO); - if (0 >= CustomPeerMap_size (push_map) && - !(0 >= CustomPeerMap_size (pull_map))) + if (0 >= CustomPeerMap_size (mss->push_map) && + !(0 >= CustomPeerMap_size (mss->pull_map))) GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO); - if (0 >= CustomPeerMap_size (push_map) && - (0 >= CustomPeerMap_size (pull_map))) + if (0 >= CustomPeerMap_size (mss->push_map) && + (0 >= CustomPeerMap_size (mss->pull_map))) GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO); - if (0 >= CustomPeerMap_size (pull_map) && - CustomPeerMap_size (push_map) > alpha * View_size () && - 0 >= CustomPeerMap_size (push_map)) + if (0 >= CustomPeerMap_size (mss->pull_map) && + CustomPeerMap_size (mss->push_map) > alpha * View_size () && + 0 >= CustomPeerMap_size (mss->push_map)) GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO); } // TODO independent of that also get some peers from CADET_get_peers()? GNUNET_STATISTICS_set (stats, "# peers in push map at end of round", - CustomPeerMap_size (push_map), + CustomPeerMap_size (mss->push_map), GNUNET_NO); GNUNET_STATISTICS_set (stats, "# peers in pull map at end of round", - CustomPeerMap_size (pull_map), + CustomPeerMap_size (mss->pull_map), GNUNET_NO); GNUNET_STATISTICS_set (stats, "# peers in view at end of round", @@ -3668,16 +3774,16 @@ do_round (void *cls) LOG (GNUNET_ERROR_TYPE_DEBUG, "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n", - CustomPeerMap_size (push_map), - CustomPeerMap_size (pull_map), + CustomPeerMap_size (mss->push_map), + CustomPeerMap_size (mss->pull_map), alpha, View_size (), alpha * View_size ()); /* Update samplers */ - for (i = 0; i < CustomPeerMap_size (push_map); i++) + for (i = 0; i < CustomPeerMap_size (mss->push_map); i++) { - update_peer = CustomPeerMap_get_peer_by_index (push_map, i); + update_peer = CustomPeerMap_get_peer_by_index (mss->push_map, i); LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating with peer %s from push list\n", GNUNET_i2s (update_peer)); @@ -3685,20 +3791,20 @@ do_round (void *cls) clean_peer (update_peer); /* This cleans only if it is not in the view */ } - for (i = 0; i < CustomPeerMap_size (pull_map); i++) + for (i = 0; i < CustomPeerMap_size (mss->pull_map); i++) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating with peer %s from pull list\n", - GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i))); - insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i)); + GNUNET_i2s (CustomPeerMap_get_peer_by_index (mss->pull_map, i))); + insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (mss->pull_map, i)); /* This cleans only if it is not in the view */ - clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i)); + clean_peer (CustomPeerMap_get_peer_by_index (mss->pull_map, i)); } /* Empty push/pull lists */ - CustomPeerMap_clear (push_map); - CustomPeerMap_clear (pull_map); + CustomPeerMap_clear (mss->push_map); + CustomPeerMap_clear (mss->pull_map); GNUNET_STATISTICS_set (stats, "view size", @@ -3707,11 +3813,11 @@ do_round (void *cls) struct GNUNET_TIME_Relative time_next_round; - time_next_round = compute_rand_delay (round_interval, 2); + time_next_round = compute_rand_delay (mss->round_interval, 2); /* Schedule next round */ - do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, - &do_round, NULL); + mss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, + &do_round, NULL); LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); } @@ -3811,10 +3917,8 @@ shutdown_task (void *cls) struct ClientContext *client_ctx; (void) cls; - in_shutdown = GNUNET_YES; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "RPS is going down\n"); + "RPS service is going down\n"); /* Clean all clients */ for (client_ctx = cli_ctx_head; @@ -3826,22 +3930,22 @@ shutdown_task (void *cls) GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle); GNUNET_PEERINFO_disconnect (peerinfo_handle); peerinfo_handle = NULL; - if (NULL != do_round_task) + if (NULL != mss->do_round_task) { - GNUNET_SCHEDULER_cancel (do_round_task); - do_round_task = NULL; + GNUNET_SCHEDULER_cancel (mss->do_round_task); + mss->do_round_task = NULL; } peers_terminate (); GNUNET_NSE_disconnect (nse); - RPS_sampler_destroy (prot_sampler); - GNUNET_CADET_close_port (cadet_port); - GNUNET_CADET_disconnect (cadet_handle); - cadet_handle = NULL; + RPS_sampler_destroy (mss->sampler); + GNUNET_CADET_close_port (mss->cadet_port); + GNUNET_CADET_disconnect (mss->cadet_handle); + mss->cadet_handle = NULL; View_destroy (); - CustomPeerMap_destroy (push_map); - CustomPeerMap_destroy (pull_map); + CustomPeerMap_destroy (mss->push_map); + CustomPeerMap_destroy (mss->pull_map); if (NULL != stats) { GNUNET_STATISTICS_destroy (stats, @@ -3851,10 +3955,10 @@ shutdown_task (void *cls) #ifdef ENABLE_MALICIOUS struct AttackedPeer *tmp_att_peer; /* it is ok to free this const during shutdown: */ - GNUNET_free ((char *) file_name_view_log); + GNUNET_free ((char *) mss->file_name_view_log); #ifdef TO_FILE - GNUNET_free ((char *) file_name_observed_log); - GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers); + GNUNET_free ((char *) mss->file_name_observed_log); + GNUNET_CONTAINER_multipeermap_destroy (mss->observed_unique_peers); #endif /* TO_FILE */ GNUNET_array_grow (mal_peers, num_mal_peers, @@ -3949,9 +4053,12 @@ run (void *cls, struct GNUNET_SERVICE_Handle *service) { char *fn_valid_peers; + struct GNUNET_TIME_Relative round_interval; + long long unsigned int sampler_size; (void) cls; (void) service; + GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL); @@ -3982,17 +4089,16 @@ run (void *cls, /* Get initial size of sampler/view from the configuration */ if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE", - (long long unsigned int *) &sampler_size_est_min)) + GNUNET_CONFIGURATION_get_value_number (cfg, + "RPS", + "MINSIZE", + &sampler_size)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "RPS", "MINSIZE"); GNUNET_SCHEDULER_shutdown (); return; } - sampler_size_est_need = sampler_size_est_min; - view_size_est_min = sampler_size_est_min; - LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg, @@ -4005,16 +4111,6 @@ run (void *cls, } - View_create (view_size_est_min); - GNUNET_STATISTICS_set (stats, "view size aim", view_size_est_min, GNUNET_NO); - - /* file_name_view_log */ - file_name_view_log = store_prefix_file_name (&own_identity, "view"); - #ifdef TO_FILE - file_name_observed_log = store_prefix_file_name (&own_identity, "observed"); - observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO); - #endif /* TO_FILE */ - /* connect to NSE */ nse = GNUNET_NSE_connect (cfg, nse_callback, NULL); @@ -4023,72 +4119,20 @@ run (void *cls, beta = 0.45; - /* Initialise cadet */ - /* There exists a copy-paste-clone in get_channel() */ - struct GNUNET_MQ_MessageHandler cadet_handlers[] = { - GNUNET_MQ_hd_fixed_size (peer_check, - GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_hd_fixed_size (peer_push, - GNUNET_MESSAGE_TYPE_RPS_PP_PUSH, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_hd_fixed_size (peer_pull_request, - GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_hd_var_size (peer_pull_reply, - GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY, - struct GNUNET_RPS_P2P_PullReplyMessage, - NULL), - GNUNET_MQ_handler_end () - }; - - cadet_handle = GNUNET_CADET_connect (cfg); - GNUNET_assert (NULL != cadet_handle); - GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS, - strlen (GNUNET_APPLICATION_PORT_RPS), - &port); - cadet_port = GNUNET_CADET_open_port (cadet_handle, - &port, - &handle_inbound_channel, /* Connect handler */ - NULL, /* cls */ - NULL, /* WindowSize handler */ - &cleanup_destroyed_channel, /* Disconnect handler */ - cadet_handlers); - if (NULL == cadet_port) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Cadet port `%s' is already in use.\n", - GNUNET_APPLICATION_PORT_RPS); - GNUNET_assert (0); - } + /* Set up main SubSampler */ + mss = new_subsampler ("", /* this is the main sampler - no shared value */ + sampler_size, /* Will be overwritten by config */ + round_interval); peerinfo_handle = GNUNET_PEERINFO_connect (cfg); - initialise_peers (fn_valid_peers, cadet_handle); - GNUNET_free (fn_valid_peers); - - /* Initialise sampler */ - struct GNUNET_TIME_Relative half_round_interval; - struct GNUNET_TIME_Relative max_round_interval; - - half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2); - max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); - - prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval); - - /* Initialise push and pull maps */ - push_map = CustomPeerMap_create (4); - pull_map = CustomPeerMap_create (4); - //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); - //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL); + //GNUNET_CADET_get_peers (mss.cadet_handle, &init_peer_cb, NULL); // TODO send push/pull to each of those peers? // TODO read stored valid peers from last run LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n"); + restore_valid_peers (); get_valid_peers (valid_peers_iterator, NULL); peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg, @@ -4098,7 +4142,7 @@ run (void *cls, LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n"); - do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); + mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n"); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); -- cgit v1.2.3