From fbbf0db19d08fe54f5b51c82b6cb1c9c7a2d040d Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Thu, 11 Oct 2018 14:39:48 +0200 Subject: RPS API: Add creation, deletion of Subs --- src/rps/gnunet-service-rps.c | 974 ++++++++++++++++++++++++------------------- 1 file changed, 553 insertions(+), 421 deletions(-) (limited to 'src/rps/gnunet-service-rps.c') diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index d1c169239..b41a77074 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -36,11 +36,10 @@ #include #include +#include #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__) -// TODO modify @brief in every file - // TODO check for overflows // TODO align message structs @@ -149,9 +148,9 @@ struct ChannelCtx; struct PeerContext { /** - * The SubSampler this context belongs to. + * The Sub this context belongs to. */ - struct SubSampler *ss; + struct Sub *sub; /** * Message queue open to client @@ -280,24 +279,17 @@ struct AttackedPeer #endif /* ENABLE_MALICIOUS */ /** - * @brief One SubSampler. + * @brief One Sub. * * Essentially one instance of brahms that only connects to other instances * with the same (secret) value. */ -struct SubSampler +struct Sub { /** - * @brief Port used for cadet. - * - * Don't compute multiple times through making it global - */ - struct GNUNET_HashCode port; - - /** - * Handler to CADET. + * @brief Hash of the shared value that defines Subs. */ - struct GNUNET_CADET_Handle *cadet_handle; + struct GNUNET_HashCode hash; /** * @brief Port to communicate to other peers. @@ -416,6 +408,11 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; */ struct GNUNET_STATISTICS_Handle *stats; +/** + * Handler to CADET. + */ +struct GNUNET_CADET_Handle *cadet_handle; + /** * Our own identity. */ @@ -516,12 +513,12 @@ static uint32_t push_limit = 10000; #endif /* ENABLE_MALICIOUS */ /** - * @brief Main SubSampler. + * @brief Main Sub. * * This is run in any case by all peers and connects to all peers without * specifying a shared value. */ -static struct SubSampler *mss; +static struct Sub *msub; /** * @brief Maximum number of valid peers to keep. @@ -529,12 +526,18 @@ static struct SubSampler *mss; */ static const uint32_t num_valid_peers_max = UINT32_MAX; - /*********************************************************************** * /Globals ***********************************************************************/ +static void +do_round (void *cls); + +static void +do_mal_round (void *cls); + + /** * @brief Get the #PeerContext associated with a peer * @@ -586,29 +589,29 @@ check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map, /** * @brief Create a new #PeerContext and insert it into the peer map * - * @param ss The SubSampler this context belongs to. + * @param sub The Sub this context belongs to. * @param peer the peer to create the #PeerContext for * * @return the #PeerContext */ static struct PeerContext * -create_peer_ctx (struct SubSampler *ss, +create_peer_ctx (struct Sub *sub, const struct GNUNET_PeerIdentity *peer) { struct PeerContext *ctx; int ret; - GNUNET_assert (GNUNET_NO == check_peer_known (ss->peer_map, peer)); + GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer)); ctx = GNUNET_new (struct PeerContext); ctx->peer_id = *peer; - ctx->ss = ss; - ret = GNUNET_CONTAINER_multipeermap_put (ss->peer_map, peer, ctx, + ctx->sub = sub; + ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); GNUNET_assert (GNUNET_OK == ret); GNUNET_STATISTICS_set (stats, "# known peers", - GNUNET_CONTAINER_multipeermap_size (ss->peer_map), + GNUNET_CONTAINER_multipeermap_size (sub->peer_map), GNUNET_NO); return ctx; } @@ -617,20 +620,20 @@ create_peer_ctx (struct SubSampler *ss, /** * @brief Create or get a #PeerContext * - * @param ss The SubSampler to which the created context belongs to + * @param sub The Sub to which the created context belongs to * @param peer the peer to get the associated context to * * @return the context */ static struct PeerContext * -create_or_get_peer_ctx (struct SubSampler *ss, +create_or_get_peer_ctx (struct Sub *sub, const struct GNUNET_PeerIdentity *peer) { - if (GNUNET_NO == check_peer_known (ss->peer_map, peer)) + if (GNUNET_NO == check_peer_known (sub->peer_map, peer)) { - return create_peer_ctx (ss, peer); + return create_peer_ctx (sub, peer); } - return get_peer_ctx (ss->peer_map, peer); + return get_peer_ctx (sub->peer_map, peer); } @@ -648,13 +651,13 @@ static int check_connected (struct PeerContext *peer_ctx) { /* If we don't know about this peer we don't know whether it's online */ - if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map, + if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map, &peer_ctx->peer_id)) { return GNUNET_NO; } /* Get the context */ - peer_ctx = get_peer_ctx (peer_ctx->ss->peer_map, &peer_ctx->peer_id); + peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id); /* If we have no channel to this peer we don't know whether it's online */ if ( (NULL == peer_ctx->send_channel_ctx) && (NULL == peer_ctx->recv_channel_ctx) ) @@ -943,10 +946,10 @@ get_channel (struct PeerContext *peer_ctx) *ctx_peer = peer_ctx->peer_id; peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx); peer_ctx->send_channel_ctx->channel = - GNUNET_CADET_channel_create (peer_ctx->ss->cadet_handle, + GNUNET_CADET_channel_create (cadet_handle, peer_ctx->send_channel_ctx, /* context */ &peer_ctx->peer_id, - &peer_ctx->ss->port, + &peer_ctx->sub->hash, GNUNET_CADET_OPTION_RELIABLE, NULL, /* WindowSize handler */ &cleanup_destroyed_channel, /* Disconnect handler */ @@ -1048,7 +1051,7 @@ mq_online_check_successful (void *cls) remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES); peer_ctx->online_check_pending = NULL; set_peer_online (peer_ctx); - (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->ss->valid_peers); + (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers); } } @@ -1187,9 +1190,9 @@ static int destroy_peer (struct PeerContext *peer_ctx) { GNUNET_assert (NULL != peer_ctx); - GNUNET_assert (NULL != peer_ctx->ss->peer_map); + GNUNET_assert (NULL != peer_ctx->sub->peer_map); if (GNUNET_NO == - GNUNET_CONTAINER_multipeermap_contains (peer_ctx->ss->peer_map, + GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map, &peer_ctx->peer_id)) { return GNUNET_NO; @@ -1259,15 +1262,15 @@ destroy_peer (struct PeerContext *peer_ctx) } if (GNUNET_YES != - GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->ss->peer_map, + GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map, &peer_ctx->peer_id)) { LOG (GNUNET_ERROR_TYPE_WARNING, - "removing peer from peer_ctx->ss->peer_map failed\n"); + "removing peer from peer_ctx->sub->peer_map failed\n"); } GNUNET_STATISTICS_set (stats, "# known peers", - GNUNET_CONTAINER_multipeermap_size (peer_ctx->ss->peer_map), + GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map), GNUNET_NO); GNUNET_free (peer_ctx); return GNUNET_YES; @@ -1288,10 +1291,10 @@ peermap_clear_iterator (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { - struct SubSampler *ss = cls; + struct Sub *sub = cls; (void) value; - destroy_peer (get_peer_ctx (ss->peer_map, key)); + destroy_peer (get_peer_ctx (sub->peer_map, key)); return GNUNET_YES; } @@ -1366,36 +1369,36 @@ store_peer_presistently_iterator (void *cls, /** * @brief Store the peers currently in #valid_peers to disk. * - * @param ss SubSampler for which to store the valid peers + * @param sub Sub for which to store the valid peers */ static void -store_valid_peers (const struct SubSampler *ss) +store_valid_peers (const struct Sub *sub) { struct GNUNET_DISK_FileHandle *fh; uint32_t number_written_peers; int ret; - if (0 == strncmp ("DISABLE", ss->filename_valid_peers, 7)) + if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7)) { return; } - ret = GNUNET_DISK_directory_create_for_file (ss->filename_valid_peers); + ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers); if (GNUNET_SYSERR == ret) { LOG (GNUNET_ERROR_TYPE_WARNING, "Not able to create directory for file `%s'\n", - ss->filename_valid_peers); + sub->filename_valid_peers); GNUNET_break (0); } else if (GNUNET_NO == ret) { LOG (GNUNET_ERROR_TYPE_WARNING, "Directory for file `%s' exists but is not writable for us\n", - ss->filename_valid_peers); + sub->filename_valid_peers); GNUNET_break (0); } - fh = GNUNET_DISK_file_open (ss->filename_valid_peers, + fh = GNUNET_DISK_file_open (sub->filename_valid_peers, GNUNET_DISK_OPEN_WRITE | GNUNET_DISK_OPEN_CREATE, GNUNET_DISK_PERM_USER_READ | @@ -1404,19 +1407,19 @@ store_valid_peers (const struct SubSampler *ss) { LOG (GNUNET_ERROR_TYPE_WARNING, "Not able to write valid peers to file `%s'\n", - ss->filename_valid_peers); + sub->filename_valid_peers); return; } LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing %u valid peers to disk\n", - GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)); + GNUNET_CONTAINER_multipeermap_size (sub->valid_peers)); number_written_peers = - GNUNET_CONTAINER_multipeermap_iterate (ss->valid_peers, + GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers, store_peer_presistently_iterator, fh); GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); GNUNET_assert (number_written_peers == - GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)); + GNUNET_CONTAINER_multipeermap_size (sub->valid_peers)); } @@ -1469,10 +1472,10 @@ s2i_full (const char *string_repr) /** * @brief Restore the peers on disk to #valid_peers. * - * @param ss SubSampler for which to restore the valid peers + * @param sub Sub for which to restore the valid peers */ static void -restore_valid_peers (const struct SubSampler *ss) +restore_valid_peers (const struct Sub *sub) { off_t file_size; uint32_t num_peers; @@ -1483,16 +1486,16 @@ restore_valid_peers (const struct SubSampler *ss) char *str_repr; const struct GNUNET_PeerIdentity *peer; - if (0 == strncmp ("DISABLE", ss->filename_valid_peers, 7)) + if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7)) { return; } - if (GNUNET_OK != GNUNET_DISK_file_test (ss->filename_valid_peers)) + if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers)) { return; } - fh = GNUNET_DISK_file_open (ss->filename_valid_peers, + fh = GNUNET_DISK_file_open (sub->filename_valid_peers, GNUNET_DISK_OPEN_READ, GNUNET_DISK_PERM_NONE); GNUNET_assert (NULL != fh); @@ -1504,13 +1507,13 @@ restore_valid_peers (const struct SubSampler *ss) LOG (GNUNET_ERROR_TYPE_DEBUG, "Restoring %" PRIu32 " peers from file `%s'\n", num_peers, - ss->filename_valid_peers); + sub->filename_valid_peers); for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53) { str_repr = GNUNET_strndup (iter_buf, 53); peer = s2i_full (str_repr); GNUNET_free (str_repr); - add_valid_peer (peer, ss->valid_peers); + add_valid_peer (peer, sub->valid_peers); LOG (GNUNET_ERROR_TYPE_DEBUG, "Restored valid peer %s from disk\n", GNUNET_i2s_full (peer)); @@ -1518,10 +1521,10 @@ restore_valid_peers (const struct SubSampler *ss) iter_buf = NULL; GNUNET_free (buf); LOG (GNUNET_ERROR_TYPE_DEBUG, - "num_peers: %" PRIu32 ", _size (ss->valid_peers): %u\n", + "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n", num_peers, - GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)); - if (num_peers != GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)) + GNUNET_CONTAINER_multipeermap_size (sub->valid_peers)); + if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Number of restored peers does not match file size. Have probably duplicates.\n"); @@ -1529,33 +1532,33 @@ restore_valid_peers (const struct SubSampler *ss) GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); LOG (GNUNET_ERROR_TYPE_DEBUG, "Restored %u valid peers from disk\n", - GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)); + GNUNET_CONTAINER_multipeermap_size (sub->valid_peers)); } /** * @brief Delete storage of peers that was created with #initialise_peers () * - * @param ss SubSampler for which the storage is deleted + * @param sub Sub for which the storage is deleted */ static void -peers_terminate (struct SubSampler *ss) +peers_terminate (struct Sub *sub) { if (GNUNET_SYSERR == - GNUNET_CONTAINER_multipeermap_iterate (ss->peer_map, + GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map, &peermap_clear_iterator, - ss)) + sub)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Iteration destroying peers was aborted.\n"); } - GNUNET_CONTAINER_multipeermap_destroy (ss->peer_map); - ss->peer_map = NULL; - store_valid_peers (ss); - GNUNET_free (ss->filename_valid_peers); - ss->filename_valid_peers = NULL; - GNUNET_CONTAINER_multipeermap_destroy (ss->valid_peers); - ss->valid_peers = NULL; + GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map); + sub->peer_map = NULL; + store_valid_peers (sub); + GNUNET_free (sub->filename_valid_peers); + sub->filename_valid_peers = NULL; + GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers); + sub->valid_peers = NULL; } @@ -1615,21 +1618,21 @@ get_valid_peers (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers, * This function is called on new peer_ids from 'external' sources * (client seed, cadet get_peers(), ...) * - * @param ss SubSampler with the peer map that the @a peer will be added to + * @param sub Sub with the peer map that the @a peer will be added to * @param peer the new #GNUNET_PeerIdentity * * @return #GNUNET_YES if peer was inserted * #GNUNET_NO otherwise */ static int -insert_peer (struct SubSampler *ss, +insert_peer (struct Sub *sub, const struct GNUNET_PeerIdentity *peer) { - if (GNUNET_YES == check_peer_known (ss->peer_map, peer)) + if (GNUNET_YES == check_peer_known (sub->peer_map, peer)) { return GNUNET_NO; /* We already know this peer - nothing to do */ } - (void) create_peer_ctx (ss, peer); + (void) create_peer_ctx (sub, peer); return GNUNET_YES; } @@ -1665,20 +1668,20 @@ check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map, * * If not known yet, insert into known peers * - * @param ss SubSampler which would contain the @a peer + * @param sub Sub which would contain the @a peer * @param peer the peer whose online is to be checked * @return #GNUNET_YES if the check was issued * #GNUNET_NO otherwise */ static int -issue_peer_online_check (struct SubSampler *ss, +issue_peer_online_check (struct Sub *sub, const struct GNUNET_PeerIdentity *peer) { struct PeerContext *peer_ctx; - (void) insert_peer (ss, peer); // TODO even needed? - peer_ctx = get_peer_ctx (ss->peer_map, peer); - if ( (GNUNET_NO == check_peer_flag (ss->peer_map, peer, Peers_ONLINE)) && + (void) insert_peer (sub, peer); // TODO even needed? + peer_ctx = get_peer_ctx (sub->peer_map, peer); + if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) && (NULL == peer_ctx->online_check_pending) ) { check_peer_online (peer_ctx); @@ -1704,7 +1707,7 @@ issue_peer_online_check (struct SubSampler *ss, static int check_removable (const struct PeerContext *peer_ctx) { - if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->ss->peer_map, + if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map, &peer_ctx->peer_id)) { return GNUNET_SYSERR; @@ -1749,7 +1752,7 @@ check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers, static void indicate_sending_intention (struct PeerContext *peer_ctx) { - GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->ss->peer_map, + GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map, &peer_ctx->peer_id)); (void) get_channel (peer_ctx); } @@ -1778,7 +1781,7 @@ check_peer_send_intention (const struct PeerContext *peer_ctx) /** * Handle the channel a peer opens to us. * - * @param cls The closure - SubSampler + * @param cls The closure - Sub * @param channel The channel the peer wants to establish * @param initiator The peer's peer ID * @@ -1793,22 +1796,22 @@ handle_inbound_channel (void *cls, struct PeerContext *peer_ctx; struct GNUNET_PeerIdentity *ctx_peer; struct ChannelCtx *channel_ctx; - struct SubSampler *ss = cls; + struct Sub *sub = cls; LOG (GNUNET_ERROR_TYPE_DEBUG, "New channel was established to us (Peer %s).\n", GNUNET_i2s (initiator)); GNUNET_assert (NULL != channel); /* according to cadet API */ /* Make sure we 'know' about this peer */ - peer_ctx = create_or_get_peer_ctx (ss, initiator); + peer_ctx = create_or_get_peer_ctx (sub, initiator); set_peer_online (peer_ctx); - (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->ss->valid_peers); + (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers); ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity); *ctx_peer = *initiator; channel_ctx = add_channel_ctx (peer_ctx); channel_ctx->channel = channel; /* We only accept one incoming channel per peer */ - if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (ss->peer_map, + if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map, initiator))) { LOG (GNUNET_ERROR_TYPE_WARNING, @@ -1835,7 +1838,7 @@ handle_inbound_channel (void *cls, static int check_sending_channel_exists (const struct PeerContext *peer_ctx) { - if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map, + if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map, &peer_ctx->peer_id)) { /* If no such peer exists, there is no channel */ return GNUNET_NO; @@ -1859,7 +1862,7 @@ check_sending_channel_exists (const struct PeerContext *peer_ctx) static int destroy_sending_channel (struct PeerContext *peer_ctx) { - if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map, + if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map, &peer_ctx->peer_id)) { return GNUNET_NO; @@ -1922,7 +1925,7 @@ schedule_operation (struct PeerContext *peer_ctx, { struct PeerPendingOp pending_op; - GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->ss->peer_map, + GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map, &peer_ctx->peer_id)); //TODO if ONLINE execute immediately @@ -2010,9 +2013,9 @@ struct ClientContext struct GNUNET_SERVICE_Client *client; /** - * The #SubSampler this context belongs to + * The #Sub this context belongs to */ - struct SubSampler *ss; + struct Sub *sub; }; /** @@ -2109,35 +2112,35 @@ insert_in_view_op (void *cls, * * Called once we know a peer is online. * - * @param ss SubSampler in with the view to insert in + * @param sub Sub in with the view to insert in * @param peer the peer to insert * * @return GNUNET_OK if peer was actually inserted * GNUNET_NO if peer was not inserted */ static int -insert_in_view (struct SubSampler *ss, +insert_in_view (struct Sub *sub, const struct GNUNET_PeerIdentity *peer) { struct PeerContext *peer_ctx; int online; int ret; - online = check_peer_flag (ss->peer_map, peer, Peers_ONLINE); - peer_ctx = get_peer_ctx (ss->peer_map, peer); // TODO indirection needed? + online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE); + peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed? if ( (GNUNET_NO == online) || (GNUNET_SYSERR == online) ) /* peer is not even known */ { - (void) issue_peer_online_check (ss, peer); + (void) issue_peer_online_check (sub, peer); (void) schedule_operation (peer_ctx, insert_in_view_op, NULL); return GNUNET_NO; } /* Open channel towards peer to keep connection open */ indicate_sending_intention (peer_ctx); - ret = View_put (ss->view, peer); + ret = View_put (sub->view, peer); GNUNET_STATISTICS_set (stats, "view size", - View_size (peer_ctx->ss->view), + View_size (peer_ctx->sub->view), GNUNET_NO); return ret; } @@ -2160,8 +2163,8 @@ send_view (const struct ClientContext *cli_ctx, if (NULL == view_array) { - view_size = View_size (cli_ctx->ss->view); - view_array = View_get_as_array (cli_ctx->ss->view); + view_size = View_size (cli_ctx->sub->view); + view_array = View_get_as_array (cli_ctx->sub->view); } ev = GNUNET_MQ_msg_extra (out_msg, @@ -2210,17 +2213,17 @@ send_stream_peers (const struct ClientContext *cli_ctx, /** * @brief sends updates to clients that are interested * - * @param ss Subsampler for which to notify clients + * @param sub Sub for which to notify clients */ static void -clients_notify_view_update (const struct SubSampler *ss) +clients_notify_view_update (const struct Sub *sub) { struct ClientContext *cli_ctx_iter; uint64_t num_peers; const struct GNUNET_PeerIdentity *view_array; - num_peers = View_size (ss->view); - view_array = View_get_as_array(ss->view); + num_peers = View_size (sub->view); + view_array = View_get_as_array(sub->view); /* check size of view is small enough */ if (GNUNET_MAX_MESSAGE_SIZE < num_peers) { @@ -2260,7 +2263,8 @@ clients_notify_view_update (const struct SubSampler *ss) * @param peers the array of peers to send */ static void -clients_notify_stream_peer (uint64_t num_peers, +clients_notify_stream_peer (const struct Sub *sub, + uint64_t num_peers, const struct GNUNET_PeerIdentity *peers) // TODO enum StreamPeerSource) { @@ -2274,7 +2278,8 @@ clients_notify_stream_peer (uint64_t num_peers, NULL != cli_ctx_iter; cli_ctx_iter = cli_ctx_iter->next) { - if (GNUNET_YES == cli_ctx_iter->stream_update) + if (GNUNET_YES == cli_ctx_iter->stream_update && + (sub == cli_ctx_iter->sub || sub == msub)) { send_stream_peers (cli_ctx_iter, num_peers, peers); } @@ -2287,7 +2292,7 @@ clients_notify_stream_peer (uint64_t num_peers, * * @param ids Array of Peers to insert into view * @param num_peers Number of peers to insert - * @param cls Closure - The SubSampler for which this is to be done + * @param cls Closure - The Sub for which this is to be done */ static void hist_update (const struct GNUNET_PeerIdentity *ids, @@ -2295,21 +2300,21 @@ hist_update (const struct GNUNET_PeerIdentity *ids, void *cls) { unsigned int i; - struct SubSampler *ss = cls; + struct Sub *sub = cls; for (i = 0; i < num_peers; i++) { int inserted; - inserted = insert_in_view (ss, &ids[i]); + inserted = insert_in_view (sub, &ids[i]); if (GNUNET_OK == inserted) { - clients_notify_stream_peer (1, &ids[i]); + clients_notify_stream_peer (sub, 1, &ids[i]); } - to_file (ss->file_name_view_log, + to_file (sub->file_name_view_log, "+%s\t(hist)", GNUNET_i2s_full (ids)); } - clients_notify_view_update (ss); + clients_notify_view_update (sub); } @@ -2433,16 +2438,16 @@ send_pull_reply (struct PeerContext *peer_ctx, * * Called once we know a peer is online. * - * @param cls Closure - SubSampler with the pull map to insert into + * @param cls Closure - Sub with the pull map to insert into * @param peer Peer to insert */ static void insert_in_pull_map (void *cls, const struct GNUNET_PeerIdentity *peer) { - struct SubSampler *ss = cls; + struct Sub *sub = cls; - CustomPeerMap_put (ss->pull_map, peer); + CustomPeerMap_put (sub->pull_map, peer); } @@ -2452,20 +2457,20 @@ insert_in_pull_map (void *cls, * Called once we know a peer is online. * Implements #PeerOp * - * @param cls Closure - SubSampler with view to insert peer into + * @param cls Closure - Sub with view to insert peer into * @param peer the peer to insert */ static void insert_in_view_op (void *cls, const struct GNUNET_PeerIdentity *peer) { - struct SubSampler *ss = cls; + struct Sub *sub = cls; int inserted; - inserted = insert_in_view (ss, peer); + inserted = insert_in_view (sub, peer); if (GNUNET_OK == inserted) { - clients_notify_stream_peer (1, peer); + clients_notify_stream_peer (sub, 1, peer); } } @@ -2474,41 +2479,41 @@ insert_in_view_op (void *cls, * Update sampler with given PeerID. * Implements #PeerOp * - * @param cls Closure - SubSampler containing the sampler to insert into + * @param cls Closure - Sub containing the sampler to insert into * @param peer Peer to insert */ static void insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer) { - struct SubSampler *ss = cls; + struct Sub *sub = cls; LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating samplers with peer %s from insert_in_sampler()\n", GNUNET_i2s (peer)); - RPS_sampler_update (ss->sampler, peer); - if (0 < RPS_sampler_count_id (ss->sampler, peer)) + RPS_sampler_update (sub->sampler, peer); + if (0 < RPS_sampler_count_id (sub->sampler, peer)) { /* Make sure we 'know' about this peer */ - (void) issue_peer_online_check (ss, peer); + (void) issue_peer_online_check (sub, peer); /* Establish a channel towards that peer to indicate we are going to send * messages to it */ //indicate_sending_intention (peer); } #ifdef TO_FILE - ss->num_observed_peers++; + sub->num_observed_peers++; GNUNET_CONTAINER_multipeermap_put - (ss->observed_unique_peers, + (sub->observed_unique_peers, peer, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); uint32_t num_observed_unique_peers = - GNUNET_CONTAINER_multipeermap_size (ss->observed_unique_peers); - to_file (ss->file_name_observed_log, + GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers); + to_file (sub->file_name_observed_log, "%" PRIu32 " %" PRIu32 " %f\n", - ss->num_observed_peers, + sub->num_observed_peers, num_observed_unique_peers, - 1.0*num_observed_unique_peers/ss->num_observed_peers) + 1.0*num_observed_unique_peers/sub->num_observed_peers) #endif /* TO_FILE */ } @@ -2520,20 +2525,20 @@ insert_in_sampler (void *cls, * * "External sources" refer to every source except the gossip. * - * @param ss SubSampler for which @a peer was received + * @param sub Sub for which @a peer was received * @param peer peer to insert/peer received */ static void -got_peer (struct SubSampler *ss, +got_peer (struct Sub *sub, const struct GNUNET_PeerIdentity *peer) { /* If we did not know this peer already, insert it into sampler and view */ - if (GNUNET_YES == issue_peer_online_check (ss, peer)) + if (GNUNET_YES == issue_peer_online_check (sub, peer)) { - schedule_operation (get_peer_ctx (ss->peer_map, peer), - &insert_in_sampler, ss); - schedule_operation (get_peer_ctx (ss->peer_map, peer), - &insert_in_view_op, ss); + schedule_operation (get_peer_ctx (sub->peer_map, peer), + &insert_in_sampler, sub); + schedule_operation (get_peer_ctx (sub->peer_map, peer), + &insert_in_view_op, sub); } GNUNET_STATISTICS_update (stats, "# learnd peers", @@ -2553,22 +2558,22 @@ static int check_sending_channel_needed (const struct PeerContext *peer_ctx) { /* struct GNUNET_CADET_Channel *channel; */ - if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map, + if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map, &peer_ctx->peer_id)) { return GNUNET_NO; } if (GNUNET_YES == check_sending_channel_exists (peer_ctx)) { - if ( (0 < RPS_sampler_count_id (peer_ctx->ss->sampler, + if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler, &peer_ctx->peer_id)) || - (GNUNET_YES == View_contains_peer (peer_ctx->ss->view, + (GNUNET_YES == View_contains_peer (peer_ctx->sub->view, &peer_ctx->peer_id)) || - (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->ss->push_map, + (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map, &peer_ctx->peer_id)) || - (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->ss->pull_map, + (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map, &peer_ctx->peer_id)) || - (GNUNET_YES == check_peer_flag (peer_ctx->ss->peer_map, + (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map, &peer_ctx->peer_id, Peers_PULL_REPLY_PENDING))) { /* If we want to keep the connection to peer open */ @@ -2584,18 +2589,18 @@ check_sending_channel_needed (const struct PeerContext *peer_ctx) * @brief remove peer from our knowledge, the view, push and pull maps and * samplers. * - * @param ss SubSampler with the data structures the peer is to be removed from + * @param sub Sub with the data structures the peer is to be removed from * @param peer the peer to remove */ static void -remove_peer (struct SubSampler *ss, +remove_peer (struct Sub *sub, const struct GNUNET_PeerIdentity *peer) { - (void) View_remove_peer (ss->view, peer); - CustomPeerMap_remove_peer (ss->pull_map, peer); - CustomPeerMap_remove_peer (ss->push_map, peer); - RPS_sampler_reinitialise_by_value (ss->sampler, peer); - destroy_peer (get_peer_ctx (ss->peer_map, peer)); + (void) View_remove_peer (sub->view, peer); + CustomPeerMap_remove_peer (sub->pull_map, peer); + CustomPeerMap_remove_peer (sub->push_map, peer); + RPS_sampler_reinitialise_by_value (sub->sampler, peer); + destroy_peer (get_peer_ctx (sub->peer_map, peer)); } @@ -2604,14 +2609,14 @@ remove_peer (struct SubSampler *ss, * * If the sending channel is no longer needed it is destroyed. * - * @param ss SubSamper in which the current peer is to be cleaned + * @param sub Sub in which the current peer is to be cleaned * @param peer the peer whose data is about to be cleaned */ static void -clean_peer (struct SubSampler *ss, +clean_peer (struct Sub *sub, const struct GNUNET_PeerIdentity *peer) { - if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (ss->peer_map, + if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map, peer))) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -2619,24 +2624,24 @@ clean_peer (struct SubSampler *ss, GNUNET_i2s (peer)); #ifdef ENABLE_MALICIOUS if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) - (void) destroy_sending_channel (get_peer_ctx (ss->peer_map, peer)); + (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer)); #else /* ENABLE_MALICIOUS */ - (void) destroy_sending_channel (get_peer_ctx (ss->peer_map, peer)); + (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer)); #endif /* ENABLE_MALICIOUS */ } - if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (ss->peer_map, + if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map, peer))) && - (GNUNET_NO == View_contains_peer (ss->view, peer)) && - (GNUNET_NO == CustomPeerMap_contains_peer (ss->push_map, peer)) && - (GNUNET_NO == CustomPeerMap_contains_peer (ss->push_map, peer)) && - (0 == RPS_sampler_count_id (ss->sampler, peer)) && - (GNUNET_NO != check_removable (get_peer_ctx (ss->peer_map, peer))) ) + (GNUNET_NO == View_contains_peer (sub->view, peer)) && + (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) && + (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) && + (0 == RPS_sampler_count_id (sub->sampler, peer)) && + (GNUNET_NO != check_removable (get_peer_ctx (sub->peer_map, peer))) ) { /* We can safely remove this peer */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to remove peer %s\n", GNUNET_i2s (peer)); - remove_peer (ss, peer); + remove_peer (sub, peer); return; } } @@ -2666,7 +2671,7 @@ cleanup_destroyed_channel (void *cls, if (NULL != peer_ctx && peer_ctx->send_channel_ctx == channel_ctx) { - remove_peer (peer_ctx->ss, &peer_ctx->peer_id); + remove_peer (peer_ctx->sub, &peer_ctx->peer_id); } } @@ -2677,35 +2682,30 @@ cleanup_destroyed_channel (void *cls, /*********************************************************************** - * SubSampler + * Sub ***********************************************************************/ /** - * @brief Create a new SUbSampler + * @brief Create a new Sub * - * @param shared_value Value shared among rps instances on other hosts that - * defines a subgroup to sample from. + * @param hash Hash of value shared among rps instances on other hosts that + * defines a subgroup to sample from. * @param sampler_size Size of the sampler * @param round_interval Interval (in average) between two rounds * - * @return SubSampler + * @return Sub */ -struct SubSampler * -new_subsampler (const char *shared_value, - uint32_t sampler_size, - struct GNUNET_TIME_Relative round_interval) +struct Sub * +new_sub (const struct GNUNET_HashCode *hash, + uint32_t sampler_size, + struct GNUNET_TIME_Relative round_interval) { - struct SubSampler *ss; - char hash_port_string[512] = GNUNET_APPLICATION_PORT_RPS; + struct Sub *sub; - ss = GNUNET_new (struct SubSampler); + sub = GNUNET_new (struct Sub); /* With the hash generated from the secret value this service only connects * to rps instances that share the value */ - strcat (hash_port_string, shared_value); - GNUNET_CRYPTO_hash (hash_port_string, - strlen (hash_port_string), - &ss->port); struct GNUNET_MQ_MessageHandler cadet_handlers[] = { GNUNET_MQ_hd_fixed_size (peer_check, GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE, @@ -2725,17 +2725,16 @@ new_subsampler (const char *shared_value, NULL), GNUNET_MQ_handler_end () }; - ss->cadet_handle = GNUNET_CADET_connect (cfg); - GNUNET_assert (NULL != ss->cadet_handle); - ss->cadet_port = - GNUNET_CADET_open_port (ss->cadet_handle, - &ss->port, + sub->hash = *hash; + sub->cadet_port = + GNUNET_CADET_open_port (cadet_handle, + &sub->hash, &handle_inbound_channel, /* Connect handler */ - ss, /* cls */ + sub, /* cls */ NULL, /* WindowSize handler */ &cleanup_destroyed_channel, /* Disconnect handler */ cadet_handlers); - if (NULL == ss->cadet_port) + if (NULL == sub->cadet_port) { LOG (GNUNET_ERROR_TYPE_ERROR, "Cadet port `%s' is already in use.\n", @@ -2744,53 +2743,98 @@ new_subsampler (const char *shared_value, } /* Set up general data structure to keep track about peers */ - ss->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); + sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg, "rps", "FILENAME_VALID_PEERS", - &ss->filename_valid_peers)) + &sub->filename_valid_peers)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "rps", "FILENAME_VALID_PEERS"); } - ss->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); + sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); /* Set up the sampler */ - ss->sampler_size_est_min = sampler_size; - ss->sampler_size_est_need = sampler_size;; - LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", ss->sampler_size_est_min); - ss->round_interval = round_interval; - ss->sampler = RPS_sampler_init (sampler_size, + sub->sampler_size_est_min = sampler_size; + sub->sampler_size_est_need = sampler_size;; + LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min); + GNUNET_assert (0 != round_interval.rel_value_us); + sub->round_interval = round_interval; + sub->sampler = RPS_sampler_init (sampler_size, round_interval); /* Logging of internals */ - ss->file_name_view_log = store_prefix_file_name (&own_identity, "view"); + sub->file_name_view_log = store_prefix_file_name (&own_identity, "view"); #ifdef TO_FILE - ss->file_name_observed_log = store_prefix_file_name (&own_identity, + sub->file_name_observed_log = store_prefix_file_name (&own_identity, "observed"); - ss->num_observed_peers = 0; - ss->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, + sub->num_observed_peers = 0; + sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO); #endif /* TO_FILE */ /* Set up data structures for gossip */ - ss->push_map = CustomPeerMap_create (4); - ss->pull_map = CustomPeerMap_create (4); - ss->view_size_est_min = sampler_size;; - ss->view = View_create (ss->view_size_est_min); + sub->push_map = CustomPeerMap_create (4); + sub->pull_map = CustomPeerMap_create (4); + sub->view_size_est_min = sampler_size;; + sub->view = View_create (sub->view_size_est_min); GNUNET_STATISTICS_set (stats, "view size aim", - ss->view_size_est_min, + sub->view_size_est_min, GNUNET_NO); + /* Start executing rounds */ + sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub); - return ss; + return sub; } + +/** + * @brief Destroy Sub. + * + * @param sub Sub to destroy + */ +static void +destroy_sub (struct Sub *sub) +{ + GNUNET_assert (NULL != sub); + GNUNET_assert (NULL != sub->do_round_task); + GNUNET_SCHEDULER_cancel (sub->do_round_task); + sub->do_round_task = NULL; + + /* Disconnect from cadet */ + GNUNET_CADET_close_port (sub->cadet_port); + + /* Clean up data structures for peers */ + RPS_sampler_destroy (sub->sampler); + sub->sampler = NULL; + View_destroy (sub->view); + sub->view = NULL; + CustomPeerMap_destroy (sub->push_map); + sub->push_map = NULL; + CustomPeerMap_destroy (sub->pull_map); + sub->pull_map = NULL; + peers_terminate (sub); + + /* Free leftover data structures */ + GNUNET_free (sub->file_name_view_log); + sub->file_name_view_log = NULL; +#ifdef TO_FILE + GNUNET_free (sub->file_name_observed_log); + sub->file_name_observed_log = NULL; + GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers); + sub->observed_unique_peers = NULL; +#endif /* TO_FILE */ + + GNUNET_free (sub); +} + + /*********************************************************************** - * /SubSampler + * /Sub ***********************************************************************/ @@ -2806,58 +2850,88 @@ destroy_cli_ctx (struct ClientContext *cli_ctx) GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, cli_ctx); + if (NULL != cli_ctx->sub) + { + destroy_sub (cli_ctx->sub); + cli_ctx->sub = NULL; + } GNUNET_free (cli_ctx); } /** - * Function called by NSE. + * @brief Update sizes in sampler and view on estimate update from nse service * - * Updates sizes of sampler list and view and adapt those lists - * accordingly. - * - * implements #GNUNET_NSE_Callback - * - * @param cls Closure - SubSampler - * @param timestamp time when the estimate was received from the server (or created by the server) + * @param sub Sub * @param logestimate the log(Base 2) value of the current network size estimate * @param std_dev standard deviation for the estimate */ static void -nse_callback (void *cls, - struct GNUNET_TIME_Absolute timestamp, - double logestimate, double std_dev) +adapt_sizes (struct Sub *sub, double logestimate, double std_dev) { double estimate; //double scale; // TODO this might go gloabal/config - struct SubSampler *ss = cls; - (void) timestamp; LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n", - logestimate, std_dev, RPS_sampler_get_size (ss->sampler)); + logestimate, std_dev, RPS_sampler_get_size (sub->sampler)); //scale = .01; estimate = GNUNET_NSE_log_estimate_to_n (logestimate); // GNUNET_NSE_log_estimate_to_n (logestimate); estimate = pow (estimate, 1.0 / 3); // TODO add if std_dev is a number // estimate += (std_dev * scale); - if (ss->view_size_est_min < ceil (estimate)) + if (sub->view_size_est_min < ceil (estimate)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate); - ss->sampler_size_est_need = estimate; - ss->view_size_est_need = estimate; + sub->sampler_size_est_need = estimate; + sub->view_size_est_need = estimate; } else { LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); - //ss->sampler_size_est_need = ss->view_size_est_min; - ss->view_size_est_need = ss->view_size_est_min; + //sub->sampler_size_est_need = sub->view_size_est_min; + sub->view_size_est_need = sub->view_size_est_min; } - GNUNET_STATISTICS_set (stats, "view size aim", ss->view_size_est_need, GNUNET_NO); + GNUNET_STATISTICS_set (stats, "view size aim", sub->view_size_est_need, GNUNET_NO); /* If the NSE has changed adapt the lists accordingly */ - resize_wrapper (ss->sampler, ss->sampler_size_est_need); - View_change_len (ss->view, ss->view_size_est_need); + resize_wrapper (sub->sampler, sub->sampler_size_est_need); + View_change_len (sub->view, sub->view_size_est_need); +} + + +/** + * Function called by NSE. + * + * Updates sizes of sampler list and view and adapt those lists + * accordingly. + * + * implements #GNUNET_NSE_Callback + * + * @param cls Closure - unused + * @param timestamp time when the estimate was received from the server (or created by the server) + * @param logestimate the log(Base 2) value of the current network size estimate + * @param std_dev standard deviation for the estimate + */ +static void +nse_callback (void *cls, + struct GNUNET_TIME_Absolute timestamp, + double logestimate, double std_dev) +{ + (void) cls; + (void) timestamp; + struct ClientContext *cli_ctx_iter; + + adapt_sizes (msub, logestimate, std_dev); + for (cli_ctx_iter = cli_ctx_head; + NULL != cli_ctx_iter; + cli_ctx_iter = cli_ctx_iter->next) + { + if (NULL != cli_ctx_iter->sub) + { + adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev); + } + } } @@ -2881,6 +2955,10 @@ check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg) if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) { + LOG (GNUNET_ERROR_TYPE_ERROR, + "message says it sends %" PRIu32 " peers, have space for %lu peers\n", + ntohl (msg->num_peers), + (msize / sizeof (struct GNUNET_PeerIdentity))); GNUNET_break (0); GNUNET_SERVICE_client_drop (cli_ctx->client); return GNUNET_SYSERR; @@ -2918,7 +2996,8 @@ handle_client_seed (void *cls, i, GNUNET_i2s (&peers[i])); - got_peer (cli_ctx->ss, &peers[i]); + if (NULL != msub) got_peer (msub, &peers[i]); + if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]); } GNUNET_SERVICE_client_continue (cli_ctx->client); } @@ -3013,15 +3092,63 @@ handle_client_stream_cancel (void *cls, (void) msg; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client requested peers from biased stream.\n"); + "Client canceled receiving peers from biased stream.\n"); cli_ctx->stream_update = GNUNET_NO; GNUNET_assert (NULL != cli_ctx); GNUNET_SERVICE_client_continue (cli_ctx->client); - if (0 == cli_ctx->view_updates_left) +} + + +/** + * @brief Create and start a Sub. + * + * @param cls Closure - unused + * @param msg Message containing the necessary information + */ +static void +handle_client_start_sub (void *cls, + const struct GNUNET_RPS_CS_SubStartMessage *msg) +{ + struct ClientContext *cli_ctx = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n"); + if (NULL != cli_ctx->sub && + 0 != memcmp (&cli_ctx->sub->hash, + &msg->hash, + sizeof (struct GNUNET_HashCode))) { - destroy_cli_ctx (cli_ctx); + LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n"); + destroy_sub (cli_ctx->sub); + cli_ctx->sub = NULL; } + cli_ctx->sub = new_sub (&msg->hash, + msub->sampler_size_est_min, // TODO make api input? + GNUNET_TIME_relative_ntoh (msg->round_interval)); + GNUNET_SERVICE_client_continue (cli_ctx->client); +} + + +/** + * @brief Destroy the Sub + * + * @param cls Closure - unused + * @param msg Message containing the hash that identifies the Sub + */ +static void +handle_client_stop_sub (void *cls, + const struct GNUNET_RPS_CS_SubStopMessage *msg) +{ + struct ClientContext *cli_ctx = cls; + + GNUNET_assert (NULL != cli_ctx->sub); + if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode))) + { + LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n"); + } + destroy_sub (cli_ctx->sub); + cli_ctx->sub = NULL; + GNUNET_SERVICE_client_continue (cli_ctx->client); } @@ -3109,9 +3236,9 @@ handle_peer_push (void *cls, #endif /* ENABLE_MALICIOUS */ /* Add the sending peer to the push_map */ - CustomPeerMap_put (channel_ctx->peer_ctx->ss->push_map, peer); + CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer); - GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map, + GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map, &channel_ctx->peer_ctx->peer_id)); GNUNET_CADET_receive_done (channel_ctx->channel); } @@ -3154,13 +3281,13 @@ handle_peer_pull_request (void *cls, } #endif /* ENABLE_MALICIOUS */ - GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map, + GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map, &channel_ctx->peer_ctx->peer_id)); GNUNET_CADET_receive_done (channel_ctx->channel); - view_array = View_get_as_array (channel_ctx->peer_ctx->ss->view); + view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view); send_pull_reply (peer_ctx, view_array, - View_size (channel_ctx->peer_ctx->ss->view)); + View_size (channel_ctx->peer_ctx->sub->view)); } @@ -3196,7 +3323,7 @@ check_peer_pull_reply (void *cls, return GNUNET_SYSERR; } - if (GNUNET_YES != check_peer_flag (sender_ctx->ss->peer_map, + if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map, &sender_ctx->peer_id, Peers_PULL_REPLY_PENDING)) { @@ -3277,27 +3404,27 @@ handle_peer_pull_reply (void *cls, } #endif /* ENABLE_MALICIOUS */ /* Make sure we 'know' about this peer */ - (void) insert_peer (channel_ctx->peer_ctx->ss, &peers[i]); + (void) insert_peer (channel_ctx->peer_ctx->sub, &peers[i]); - if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->ss->valid_peers, + if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers, &peers[i])) { - CustomPeerMap_put (channel_ctx->peer_ctx->ss->pull_map, &peers[i]); + CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map, &peers[i]); } else { schedule_operation (channel_ctx->peer_ctx, insert_in_pull_map, - channel_ctx->peer_ctx->ss); /* cls */ - (void) issue_peer_online_check (channel_ctx->peer_ctx->ss, &peers[i]); + channel_ctx->peer_ctx->sub); /* cls */ + (void) issue_peer_online_check (channel_ctx->peer_ctx->sub, &peers[i]); } } - UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->ss->peer_map, sender), + UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map, sender), Peers_PULL_REPLY_PENDING); - clean_peer (channel_ctx->peer_ctx->ss, sender); + clean_peer (channel_ctx->peer_ctx->sub, sender); - GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map, + GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map, sender)); GNUNET_CADET_receive_done (channel_ctx->channel); } @@ -3362,7 +3489,7 @@ send_pull_request (struct PeerContext *peer_ctx) { struct GNUNET_MQ_Envelope *ev; - GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->ss->peer_map, + GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map, &peer_ctx->peer_id, Peers_PULL_REPLY_PENDING)); SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING); @@ -3397,12 +3524,6 @@ send_push (struct PeerContext *peer_ctx) } -static void -do_round (void *cls); - -static void -do_mal_round (void *cls); - #ifdef ENABLE_MALICIOUS @@ -3452,7 +3573,7 @@ handle_client_act_malicious (void *cls, struct GNUNET_PeerIdentity *peers; uint32_t num_mal_peers_sent; uint32_t num_mal_peers_old; - struct SubSampler *ss = cli_ctx->ss; + struct Sub *sub = cli_ctx->sub; /* Do actual logic */ peers = (struct GNUNET_PeerIdentity *) &msg[1]; @@ -3484,8 +3605,15 @@ handle_client_act_malicious (void *cls, mal_peer_set); /* Substitute do_round () with do_mal_round () */ - GNUNET_SCHEDULER_cancel (ss->do_round_task); - ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, ss); + if (NULL != sub) + { + GNUNET_SCHEDULER_cancel (sub->do_round_task); + sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub); + } + else + { + LOG (GNUNET_ERROR_TYPE_WARNING, "do_round_task is NULL, probably in shutdown\n"); + } } else if ( (2 == mal_type) || @@ -3517,9 +3645,10 @@ handle_client_act_malicious (void *cls, &msg->attacked_peer, sizeof (struct GNUNET_PeerIdentity)); /* Set the flag of the attacked peer to valid to avoid problems */ - if (GNUNET_NO == check_peer_known (ss->peer_map, &attacked_peer)) + if (NULL != sub && + GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer)) { - (void) issue_peer_online_check (ss, &attacked_peer); + (void) issue_peer_online_check (sub, &attacked_peer); } LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -3527,16 +3656,20 @@ handle_client_act_malicious (void *cls, GNUNET_i2s (&attacked_peer)); /* Substitute do_round () with do_mal_round () */ - GNUNET_SCHEDULER_cancel (ss->do_round_task); - ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, ss); + if (NULL != sub && NULL != sub->do_round_task) + { + /* Probably in shutdown */ + GNUNET_SCHEDULER_cancel (sub->do_round_task); + sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub); + } } else if (0 == mal_type) { /* Stop acting malicious */ GNUNET_array_grow (mal_peers, num_mal_peers, 0); /* Substitute do_mal_round () with do_round () */ - GNUNET_SCHEDULER_cancel (ss->do_round_task); - ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, ss); + GNUNET_SCHEDULER_cancel (sub->do_round_task); + sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub); } else { @@ -3552,7 +3685,7 @@ handle_client_act_malicious (void *cls, * * This is executed regylary. * - * @param cls Closure - SubSamper + * @param cls Closure - Sub */ static void do_mal_round (void *cls) @@ -3561,12 +3694,12 @@ do_mal_round (void *cls) uint32_t i; struct GNUNET_TIME_Relative time_next_round; struct AttackedPeer *tmp_att_peer; - struct SubSampler *ss = cls; + struct Sub *sub = cls; LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round maliciously type %" PRIu32 ".\n", mal_type); - ss->do_round_task = NULL; + sub->do_round_task = NULL; GNUNET_assert (mal_type <= 3); /* Do malicious actions */ if (1 == mal_type) @@ -3589,7 +3722,7 @@ do_mal_round (void *cls) else att_peer_index = att_peer_index->next; - send_push (get_peer_ctx (ss->peer_map, &att_peer_index->peer_id)); + send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id)); } /* Send PULLs to some peers to learn about additional peers to attack */ @@ -3601,7 +3734,7 @@ do_mal_round (void *cls) else att_peer_index = tmp_att_peer->next; - send_pull_request (get_peer_ctx (ss->peer_map, &tmp_att_peer->peer_id)); + send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id)); } } @@ -3612,11 +3745,11 @@ do_mal_round (void *cls) * Send as many pushes to the attacked peer as possible * That is one push per round as it will ignore more. */ - (void) issue_peer_online_check (ss, &attacked_peer); - if (GNUNET_YES == check_peer_flag (ss->peer_map, + (void) issue_peer_online_check (sub, &attacked_peer); + if (GNUNET_YES == check_peer_flag (sub->peer_map, &attacked_peer, Peers_ONLINE)) - send_push (get_peer_ctx (ss->peer_map, &attacked_peer)); + send_push (get_peer_ctx (sub->peer_map, &attacked_peer)); } @@ -3624,20 +3757,20 @@ do_mal_round (void *cls) { /* Combined attack */ /* Send PUSH to attacked peers */ - if (GNUNET_YES == check_peer_known (ss->peer_map, &attacked_peer)) + if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer)) { - (void) issue_peer_online_check (ss, &attacked_peer); - if (GNUNET_YES == check_peer_flag (ss->peer_map, + (void) issue_peer_online_check (sub, &attacked_peer); + if (GNUNET_YES == check_peer_flag (sub->peer_map, &attacked_peer, Peers_ONLINE)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Goding to send push to attacked peer (%s)\n", GNUNET_i2s (&attacked_peer)); - send_push (get_peer_ctx (ss->peer_map, &attacked_peer)); + send_push (get_peer_ctx (sub->peer_map, &attacked_peer)); } } - (void) issue_peer_online_check (ss, &attacked_peer); + (void) issue_peer_online_check (sub, &attacked_peer); /* The maximum of pushes we're going to send this round */ num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1, @@ -3655,7 +3788,7 @@ do_mal_round (void *cls) else att_peer_index = att_peer_index->next; - send_push (get_peer_ctx (ss->peer_map, &att_peer_index->peer_id)); + send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id)); } /* Send PULLs to some peers to learn about additional peers to attack */ @@ -3667,18 +3800,16 @@ do_mal_round (void *cls) else att_peer_index = tmp_att_peer->next; - send_pull_request (get_peer_ctx (ss->peer_map, &tmp_att_peer->peer_id)); + send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id)); } } /* Schedule next round */ - time_next_round = compute_rand_delay (ss->round_interval, 2); + time_next_round = compute_rand_delay (sub->round_interval, 2); - //ss->do_round_task = GNUNET_SCHEDULER_add_delayed (ss->round_interval, &do_mal_round, - //NULL); - GNUNET_assert (NULL == ss->do_round_task); - ss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, - &do_mal_round, ss); + GNUNET_assert (NULL == sub->do_round_task); + sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, + &do_mal_round, sub); LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); } #endif /* ENABLE_MALICIOUS */ @@ -3688,7 +3819,7 @@ do_mal_round (void *cls) * * This is executed regylary. * - * @param cls Closure - SubSampler + * @param cls Closure - Sub */ static void do_round (void *cls) @@ -3702,66 +3833,66 @@ do_round (void *cls) uint32_t second_border; struct GNUNET_PeerIdentity peer; struct GNUNET_PeerIdentity *update_peer; - struct SubSampler *ss = cls; + struct Sub *sub = cls; LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round.\n"); GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO); - ss->do_round_task = NULL; + sub->do_round_task = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Printing view:\n"); - to_file (ss->file_name_view_log, + to_file (sub->file_name_view_log, "___ new round ___"); - view_array = View_get_as_array (ss->view); - for (i = 0; i < View_size (ss->view); i++) + view_array = View_get_as_array (sub->view); + for (i = 0; i < View_size (sub->view); i++) { LOG (GNUNET_ERROR_TYPE_DEBUG, "\t%s\n", GNUNET_i2s (&view_array[i])); - to_file (ss->file_name_view_log, + to_file (sub->file_name_view_log, "=%s\t(do round)", GNUNET_i2s_full (&view_array[i])); } /* Send pushes and pull requests */ - if (0 < View_size (ss->view)) + if (0 < View_size (sub->view)) { permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, - View_size (ss->view)); + View_size (sub->view)); /* Send PUSHes */ - a_peers = ceil (alpha * View_size (ss->view)); + a_peers = ceil (alpha * View_size (sub->view)); LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (ceil (%f * %u)) peers.\n", - a_peers, alpha, View_size (ss->view)); + a_peers, alpha, View_size (sub->view)); for (i = 0; i < a_peers; i++) { peer = view_array[permut[i]]; // FIXME if this fails schedule/loop this for later - send_push (get_peer_ctx (ss->peer_map, &peer)); + send_push (get_peer_ctx (sub->peer_map, &peer)); } /* Send PULL requests */ - b_peers = ceil (beta * View_size (ss->view)); + b_peers = ceil (beta * View_size (sub->view)); first_border = a_peers; second_border = a_peers + b_peers; - if (second_border > View_size (ss->view)) + if (second_border > View_size (sub->view)) { - first_border = View_size (ss->view) - b_peers; - second_border = View_size (ss->view); + first_border = View_size (sub->view) - b_peers; + second_border = View_size (sub->view); } LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (ceil (%f * %u)) peers.\n", - b_peers, beta, View_size (ss->view)); + b_peers, beta, View_size (sub->view)); for (i = first_border; i < second_border; i++) { peer = view_array[permut[i]]; - if ( GNUNET_NO == check_peer_flag (ss->peer_map, + if ( GNUNET_NO == check_peer_flag (sub->peer_map, &peer, Peers_PULL_REPLY_PENDING)) { // FIXME if this fails schedule/loop this for later - send_pull_request (get_peer_ctx (ss->peer_map, &peer)); + send_pull_request (get_peer_ctx (sub->peer_map, &peer)); } } @@ -3773,10 +3904,9 @@ do_round (void *cls) /* Update view */ /* TODO see how many peers are in push-/pull- list! */ - if ((CustomPeerMap_size (ss->push_map) <= alpha * ss->view_size_est_need) && - (0 < CustomPeerMap_size (ss->push_map)) && - (0 < CustomPeerMap_size (ss->pull_map))) - //if (GNUNET_YES) // disable blocking temporarily + if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) && + (0 < CustomPeerMap_size (sub->push_map)) && + (0 < CustomPeerMap_size (sub->pull_map))) { /* If conditions for update are fulfilled, update */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n"); @@ -3788,23 +3918,23 @@ do_round (void *cls) peers_to_clean_size = 0; GNUNET_array_grow (peers_to_clean, peers_to_clean_size, - View_size (ss->view)); + View_size (sub->view)); GNUNET_memcpy (peers_to_clean, view_array, - View_size (ss->view) * sizeof (struct GNUNET_PeerIdentity)); + View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity)); /* Seems like recreating is the easiest way of emptying the peermap */ - View_clear (ss->view); - to_file (ss->file_name_view_log, + View_clear (sub->view); + to_file (sub->file_name_view_log, "--- emptied ---"); - first_border = GNUNET_MIN (ceil (alpha * ss->view_size_est_need), - CustomPeerMap_size (ss->push_map)); + first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need), + CustomPeerMap_size (sub->push_map)); second_border = first_border + - GNUNET_MIN (floor (beta * ss->view_size_est_need), - CustomPeerMap_size (ss->pull_map)); + GNUNET_MIN (floor (beta * sub->view_size_est_need), + CustomPeerMap_size (sub->pull_map)); final_size = second_border + - ceil ((1 - (alpha + beta)) * ss->view_size_est_need); + ceil ((1 - (alpha + beta)) * sub->view_size_est_need); LOG (GNUNET_ERROR_TYPE_DEBUG, "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n", first_border, @@ -3813,19 +3943,20 @@ do_round (void *cls) /* Update view with peers received through PUSHes */ permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, - CustomPeerMap_size (ss->push_map)); + CustomPeerMap_size (sub->push_map)); for (i = 0; i < first_border; i++) { int inserted; - inserted = insert_in_view (ss, - CustomPeerMap_get_peer_by_index (ss->push_map, + inserted = insert_in_view (sub, + CustomPeerMap_get_peer_by_index (sub->push_map, permut[i])); if (GNUNET_OK == inserted) { - clients_notify_stream_peer (1, - CustomPeerMap_get_peer_by_index (ss->push_map, permut[i])); + clients_notify_stream_peer (sub, + 1, + CustomPeerMap_get_peer_by_index (sub->push_map, permut[i])); } - to_file (ss->file_name_view_log, + to_file (sub->file_name_view_log, "+%s\t(push list)", GNUNET_i2s_full (&view_array[i])); // TODO change the peer_flags accordingly @@ -3835,20 +3966,21 @@ do_round (void *cls) /* Update view with peers received through PULLs */ permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, - CustomPeerMap_size (ss->pull_map)); + CustomPeerMap_size (sub->pull_map)); for (i = first_border; i < second_border; i++) { int inserted; - inserted = insert_in_view (ss, - CustomPeerMap_get_peer_by_index (ss->pull_map, + inserted = insert_in_view (sub, + CustomPeerMap_get_peer_by_index (sub->pull_map, permut[i - first_border])); if (GNUNET_OK == inserted) { - clients_notify_stream_peer (1, - CustomPeerMap_get_peer_by_index (ss->pull_map, + clients_notify_stream_peer (sub, + 1, + CustomPeerMap_get_peer_by_index (sub->pull_map, permut[i - first_border])); } - to_file (ss->file_name_view_log, + to_file (sub->file_name_view_log, "+%s\t(pull list)", GNUNET_i2s_full (&view_array[i])); // TODO change the peer_flags accordingly @@ -3857,106 +3989,106 @@ do_round (void *cls) permut = NULL; /* Update view with peers from history */ - RPS_sampler_get_n_rand_peers (ss->sampler, + RPS_sampler_get_n_rand_peers (sub->sampler, final_size - second_border, hist_update, - ss); + sub); // TODO change the peer_flags accordingly - for (i = 0; i < View_size (ss->view); i++) + for (i = 0; i < View_size (sub->view); i++) rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]); /* Clean peers that were removed from the view */ for (i = 0; i < peers_to_clean_size; i++) { - to_file (ss->file_name_view_log, + to_file (sub->file_name_view_log, "-%s", GNUNET_i2s_full (&peers_to_clean[i])); - clean_peer (ss, &peers_to_clean[i]); + clean_peer (sub, &peers_to_clean[i]); } GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0); - clients_notify_view_update (ss); + clients_notify_view_update (sub); } else { LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n"); GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO); - if (CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) && - !(0 >= CustomPeerMap_size (ss->pull_map))) + if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) && + !(0 >= CustomPeerMap_size (sub->pull_map))) GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO); - if (CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) && - (0 >= CustomPeerMap_size (ss->pull_map))) + if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) && + (0 >= CustomPeerMap_size (sub->pull_map))) GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO); - if (0 >= CustomPeerMap_size (ss->push_map) && - !(0 >= CustomPeerMap_size (ss->pull_map))) + if (0 >= CustomPeerMap_size (sub->push_map) && + !(0 >= CustomPeerMap_size (sub->pull_map))) GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO); - if (0 >= CustomPeerMap_size (ss->push_map) && - (0 >= CustomPeerMap_size (ss->pull_map))) + if (0 >= CustomPeerMap_size (sub->push_map) && + (0 >= CustomPeerMap_size (sub->pull_map))) GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO); - if (0 >= CustomPeerMap_size (ss->pull_map) && - CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) && - 0 >= CustomPeerMap_size (ss->push_map)) + if (0 >= CustomPeerMap_size (sub->pull_map) && + CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) && + 0 >= CustomPeerMap_size (sub->push_map)) GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO); } // TODO independent of that also get some peers from CADET_get_peers()? GNUNET_STATISTICS_set (stats, "# peers in push map at end of round", - CustomPeerMap_size (ss->push_map), + CustomPeerMap_size (sub->push_map), GNUNET_NO); GNUNET_STATISTICS_set (stats, "# peers in pull map at end of round", - CustomPeerMap_size (ss->pull_map), + CustomPeerMap_size (sub->pull_map), GNUNET_NO); GNUNET_STATISTICS_set (stats, "# peers in view at end of round", - View_size (ss->view), + View_size (sub->view), GNUNET_NO); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (ss->view%u) = %.2f)\n", - CustomPeerMap_size (ss->push_map), - CustomPeerMap_size (ss->pull_map), + "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n", + CustomPeerMap_size (sub->push_map), + CustomPeerMap_size (sub->pull_map), alpha, - View_size (ss->view), - alpha * View_size (ss->view)); + View_size (sub->view), + alpha * View_size (sub->view)); /* Update samplers */ - for (i = 0; i < CustomPeerMap_size (ss->push_map); i++) + for (i = 0; i < CustomPeerMap_size (sub->push_map); i++) { - update_peer = CustomPeerMap_get_peer_by_index (ss->push_map, i); + update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i); LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating with peer %s from push list\n", GNUNET_i2s (update_peer)); - insert_in_sampler (ss, update_peer); - clean_peer (ss, update_peer); /* This cleans only if it is not in the view */ + insert_in_sampler (sub, update_peer); + clean_peer (sub, update_peer); /* This cleans only if it is not in the view */ } - for (i = 0; i < CustomPeerMap_size (ss->pull_map); i++) + for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating with peer %s from pull list\n", - GNUNET_i2s (CustomPeerMap_get_peer_by_index (ss->pull_map, i))); - insert_in_sampler (ss, CustomPeerMap_get_peer_by_index (ss->pull_map, i)); + GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i))); + insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i)); /* This cleans only if it is not in the view */ - clean_peer (ss, CustomPeerMap_get_peer_by_index (ss->pull_map, i)); + clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i)); } /* Empty push/pull lists */ - CustomPeerMap_clear (ss->push_map); - CustomPeerMap_clear (ss->pull_map); + CustomPeerMap_clear (sub->push_map); + CustomPeerMap_clear (sub->pull_map); GNUNET_STATISTICS_set (stats, "view size", - View_size(ss->view), + View_size(sub->view), GNUNET_NO); struct GNUNET_TIME_Relative time_next_round; - time_next_round = compute_rand_delay (ss->round_interval, 2); + time_next_round = compute_rand_delay (sub->round_interval, 2); /* Schedule next round */ - ss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, - &do_round, ss); + sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, + &do_round, sub); LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); } @@ -3969,7 +4101,7 @@ do_round (void *cls) * * implements #GNUNET_CADET_PeersCB * - * @param cls Closure - SubSampler + * @param cls Closure - Sub * @param peer Peer, or NULL on "EOF". * @param tunnel Do we have a tunnel towards this peer? * @param n_paths Number of known paths towards this peer. @@ -3979,12 +4111,12 @@ do_round (void *cls) void init_peer_cb (void *cls, const struct GNUNET_PeerIdentity *peer, - int tunnel, // "Do we have a tunnel towards this peer?" - unsigned int n_paths, // "Number of known paths towards this peer" - unsigned int best_path) // "How long is the best path? - // (0 = unknown, 1 = ourselves, 2 = neighbor)" + int tunnel, /* "Do we have a tunnel towards this peer?" */ + unsigned int n_paths, /* "Number of known paths towards this peer" */ + unsigned int best_path) /* "How long is the best path? + * (0 = unknown, 1 = ourselves, 2 = neighbor)" */ { - struct SubSampler *ss = cls; + struct Sub *sub = cls; (void) tunnel; (void) n_paths; (void) best_path; @@ -3994,7 +4126,7 @@ init_peer_cb (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Got peer_id %s from cadet\n", GNUNET_i2s (peer)); - got_peer (ss, peer); + got_peer (sub, peer); } } @@ -4004,7 +4136,7 @@ init_peer_cb (void *cls, * * We initialise the sampler with those. * - * @param cls Closure - SubSampler + * @param cls Closure - Sub * @param peer the peer id * @return #GNUNET_YES if we should continue to * iterate, @@ -4014,14 +4146,14 @@ static int valid_peers_iterator (void *cls, const struct GNUNET_PeerIdentity *peer) { - struct SubSampler *ss = cls; + struct Sub *sub = cls; if (NULL != peer) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Got stored, valid peer %s\n", GNUNET_i2s (peer)); - got_peer (ss, peer); + got_peer (sub, peer); } return GNUNET_YES; } @@ -4030,7 +4162,7 @@ valid_peers_iterator (void *cls, /** * Iterator over peers from peerinfo. * - * @param cls Closure - SubSampler + * @param cls Closure - Sub * @param peer id of the peer, NULL for last call * @param hello hello message for the peer (can be NULL) * @param error message @@ -4041,7 +4173,7 @@ process_peerinfo_peers (void *cls, const struct GNUNET_HELLO_Message *hello, const char *err_msg) { - struct SubSampler *ss = cls; + struct Sub *sub = cls; (void) hello; (void) err_msg; @@ -4050,7 +4182,7 @@ process_peerinfo_peers (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Got peer_id %s from peerinfo\n", GNUNET_i2s (peer)); - got_peer (ss, peer); + got_peer (sub, peer); } } @@ -4058,14 +4190,13 @@ process_peerinfo_peers (void *cls, /** * Task run during shutdown. * - * @param cls Closure - SubSampler containing all datastructures to clean + * @param cls Closure - unused */ static void shutdown_task (void *cls) { - struct ClientContext *client_ctx; (void) cls; - struct SubSampler *ss = cls; + struct ClientContext *client_ctx; LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS service is going down\n"); @@ -4077,39 +4208,28 @@ shutdown_task (void *cls) { destroy_cli_ctx (client_ctx); } - GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle); - GNUNET_PEERINFO_disconnect (peerinfo_handle); - peerinfo_handle = NULL; - if (NULL != ss->do_round_task) + if (NULL != msub) { - GNUNET_SCHEDULER_cancel (ss->do_round_task); - ss->do_round_task = NULL; + destroy_sub (msub); + msub = NULL; } - peers_terminate (ss); - + /* Disconnect from other services */ + GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle); + GNUNET_PEERINFO_disconnect (peerinfo_handle); + peerinfo_handle = NULL; GNUNET_NSE_disconnect (nse); - RPS_sampler_destroy (ss->sampler); - GNUNET_CADET_close_port (ss->cadet_port); - GNUNET_CADET_disconnect (ss->cadet_handle); - ss->cadet_handle = NULL; - View_destroy (ss->view); - CustomPeerMap_destroy (ss->push_map); - CustomPeerMap_destroy (ss->pull_map); + if (NULL != stats) { GNUNET_STATISTICS_destroy (stats, GNUNET_NO); stats = NULL; } + GNUNET_CADET_disconnect (cadet_handle); + cadet_handle = NULL; #ifdef ENABLE_MALICIOUS struct AttackedPeer *tmp_att_peer; - /* it is ok to free this const during shutdown: */ - GNUNET_free ((char *) ss->file_name_view_log); -#ifdef TO_FILE - GNUNET_free ((char *) ss->file_name_observed_log); - GNUNET_CONTAINER_multipeermap_destroy (ss->observed_unique_peers); -#endif /* TO_FILE */ GNUNET_array_grow (mal_peers, num_mal_peers, 0); @@ -4154,7 +4274,6 @@ client_connect_cb (void *cls, cli_ctx->view_updates_left = -1; cli_ctx->stream_update = GNUNET_NO; cli_ctx->client = client; - cli_ctx->ss = mss; GNUNET_CONTAINER_DLL_insert (cli_ctx_head, cli_ctx_tail, cli_ctx); @@ -4206,6 +4325,8 @@ run (void *cls, char *fn_valid_peers; struct GNUNET_TIME_Relative round_interval; long long unsigned int sampler_size; + char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS; + struct GNUNET_HashCode hash; (void) cls; (void) service; @@ -4262,40 +4383,43 @@ run (void *cls, "FILENAME_VALID_PEERS"); } + cadet_handle = GNUNET_CADET_connect (cfg); + GNUNET_assert (NULL != cadet_handle); + + alpha = 0.45; beta = 0.45; - /* Set up main SubSampler */ - mss = new_subsampler ("", /* this is the main sampler - no shared value */ - sampler_size, /* Will be overwritten by config */ - round_interval); + /* Set up main Sub */ + GNUNET_CRYPTO_hash (hash_port_string, + strlen (hash_port_string), + &hash); + msub = new_sub (&hash, + sampler_size, /* Will be overwritten by config */ + round_interval); peerinfo_handle = GNUNET_PEERINFO_connect (cfg); /* connect to NSE */ - nse = GNUNET_NSE_connect (cfg, nse_callback, mss); + nse = GNUNET_NSE_connect (cfg, nse_callback, NULL); //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); - //GNUNET_CADET_get_peers (mss.cadet_handle, &init_peer_cb, mss); + //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub); // TODO send push/pull to each of those peers? - // TODO read stored valid peers from last run LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n"); - restore_valid_peers (mss); - get_valid_peers (mss->valid_peers, valid_peers_iterator, mss); + restore_valid_peers (msub); + get_valid_peers (msub->valid_peers, valid_peers_iterator, msub); peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg, GNUNET_NO, process_peerinfo_peers, - mss); + msub); LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n"); - mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, mss); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n"); - - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, mss); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); stats = GNUNET_STATISTICS_create ("rps", cfg); } @@ -4336,6 +4460,14 @@ GNUNET_SERVICE_MAIN GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL, struct GNUNET_MessageHeader, NULL), + GNUNET_MQ_hd_fixed_size (client_start_sub, + GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START, + struct GNUNET_RPS_CS_SubStartMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_stop_sub, + GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP, + struct GNUNET_RPS_CS_SubStopMessage, + NULL), GNUNET_MQ_handler_end()); /* end of gnunet-service-rps.c */ -- cgit v1.2.3