summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2019-04-08 00:55:35 +0200
committerJulius Bünger <buenger@mytum.de>2019-04-08 00:56:13 +0200
commit3afe1a3435697b01fee557420a701fba1821dbe5 (patch)
treed829a665412d2d59bccc77df309bd4d75a9c9ded /src
parentb1d840b2104b9cf4a1cd6997bce39bd4b6c1b8bf (diff)
RPS: Retrieve more info from sampler for profiling
Diffstat (limited to 'src')
-rw-r--r--src/rps/gnunet-rps-profiler.c91
-rw-r--r--src/rps/gnunet-service-rps_sampler.c2
-rw-r--r--src/rps/rps-sampler_client.c64
-rw-r--r--src/rps/rps-sampler_client.h8
-rw-r--r--src/rps/rps-sampler_common.c187
-rw-r--r--src/rps/rps-sampler_common.h56
-rw-r--r--src/rps/rps_api.c193
7 files changed, 576 insertions, 25 deletions
diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c
index a852d94b1..ffc9d6f7e 100644
--- a/src/rps/gnunet-rps-profiler.c
+++ b/src/rps/gnunet-rps-profiler.c
@@ -429,7 +429,7 @@ struct PendingReply
/**
* Handle to the request we are waiting for
*/
- struct GNUNET_RPS_Request_Handle *req_handle;
+ struct GNUNET_RPS_Request_Handle_Single_Info *req_handle;
/**
* The peer that requested
@@ -1040,7 +1040,7 @@ cancel_request (struct PendingReply *pending_rep)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Cancelling rps get reply\n");
GNUNET_assert (NULL != pending_rep->req_handle);
- GNUNET_RPS_request_cancel (pending_rep->req_handle);
+ GNUNET_RPS_request_single_info_cancel (pending_rep->req_handle);
pending_rep->req_handle = NULL;
GNUNET_free (pending_rep);
pending_rep = NULL;
@@ -1489,6 +1489,13 @@ default_reply_handle (void *cls,
}
}
+
+static void
+profiler_reply_handle_info (void *cls,
+ const struct GNUNET_PeerIdentity *recv_peer,
+ double probability,
+ uint32_t num_observed);
+
/**
* Request random peers.
*/
@@ -1510,9 +1517,12 @@ request_peers (void *cls)
"Requesting one peer\n");
pending_rep = GNUNET_new (struct PendingReply);
pending_rep->rps_peer = rps_peer;
- pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
- 1,
- cur_test_run.reply_handle,
+ //pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
+ // 1,
+ // cur_test_run.reply_handle,
+ // pending_rep);
+ pending_rep->req_handle = GNUNET_RPS_request_peer_info (rps_peer->rps_handle,
+ profiler_reply_handle_info,
pending_rep);
GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
rps_peer->pending_rep_tail,
@@ -1979,6 +1989,77 @@ profiler_reply_handle (void *cls,
}
+/**
+ * Callback to call on receipt of a reply
+ *
+ * @param cls closure
+ * @param n number of peers
+ * @param recv_peers the received peers
+ */
+static void
+profiler_reply_handle_info (void *cls,
+ const struct GNUNET_PeerIdentity *recv_peer,
+ double probability,
+ uint32_t num_observed)
+{
+ struct RPSPeer *rps_peer;
+ struct RPSPeer *rcv_rps_peer;
+ char file_name_buf[128];
+ char file_name_dh_buf[128];
+ char file_name_dhr_buf[128];
+ char file_name_dhru_buf[128];
+ char *file_name = file_name_buf;
+ char *file_name_dh = file_name_dh_buf;
+ char *file_name_dhr = file_name_dhr_buf;
+ char *file_name_dhru = file_name_dhru_buf;
+ unsigned int i;
+ struct PendingReply *pending_rep = (struct PendingReply *) cls;
+
+ pending_rep->req_handle = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "profiler_reply_handle()\n");
+ rps_peer = pending_rep->rps_peer;
+ (void) GNUNET_asprintf (&file_name,
+ "/tmp/rps/received_ids-%u",
+ rps_peer->index);
+
+ (void) GNUNET_asprintf (&file_name_dh,
+ "/tmp/rps/diehard_input-%u",
+ rps_peer->index);
+ (void) GNUNET_asprintf (&file_name_dhr,
+ "/tmp/rps/diehard_input_raw-%u",
+ rps_peer->index);
+ (void) GNUNET_asprintf (&file_name_dhru,
+ "/tmp/rps/diehard_input_raw_aligned-%u",
+ rps_peer->index);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "[%s] got peer with info:\n",
+ GNUNET_i2s (rps_peer->peer_id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " %s\n",
+ GNUNET_i2s (recv_peer));
+ tofile (file_name,
+ "%s %d %" PRIu32 " \n",
+ GNUNET_i2s_full (recv_peer),
+ probability,
+ num_observed);
+ rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, recv_peer);
+ GNUNET_assert (NULL != rcv_rps_peer);
+ tofile (file_name_dh,
+ "%" PRIu32 "\n",
+ (uint32_t) rcv_rps_peer->index);
+#ifdef TO_FILE
+ to_file_raw (file_name_dhr,
+ (char *) &rcv_rps_peer->index,
+ sizeof (uint32_t));
+ to_file_raw_unaligned (file_name_dhru,
+ (char *) &rcv_rps_peer->index,
+ sizeof (uint32_t),
+ bits_needed);
+#endif /* TO_FILE */
+ default_reply_handle (cls, 1, recv_peer);
+}
+
+
static void
profiler_cb (struct RPSPeer *rps_peer)
{
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index a95ac82d4..e17b154ca 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -257,7 +257,7 @@ sampler_get_rand_peer (void *cls)
gpc->req_handle->gpc_tail,
gpc);
*gpc->id = sampler->sampler_elements[r_index]->peer_id;
- gpc->cont (gpc->cont_cls, gpc->id);
+ gpc->cont (gpc->cont_cls, gpc->id, 0, sampler->sampler_elements[r_index]->num_peers);
GNUNET_free (gpc);
}
diff --git a/src/rps/rps-sampler_client.c b/src/rps/rps-sampler_client.c
index 0de25df07..61f9b6385 100644
--- a/src/rps/rps-sampler_client.c
+++ b/src/rps/rps-sampler_client.c
@@ -158,6 +158,46 @@ struct RPS_SamplerRequestHandle
void *cls;
};
+
+/**
+ * Closure to _get_rand_peer_info()
+ */
+struct RPS_SamplerRequestHandleSingleInfo
+{
+ /**
+ * DLL
+ */
+ struct RPS_SamplerRequestHandleSingleInfo *next;
+ struct RPS_SamplerRequestHandleSingleInfo *prev;
+
+ /**
+ * Pointer to the id
+ */
+ struct GNUNET_PeerIdentity *id;
+
+ /**
+ * Head and tail for the DLL to store the tasks for single requests
+ */
+ struct GetPeerCls *gpc_head;
+ struct GetPeerCls *gpc_tail;
+
+ /**
+ * Sampler.
+ */
+ struct RPS_Sampler *sampler;
+
+ /**
+ * Callback to be called when all ids are available.
+ */
+ RPS_sampler_sinlge_info_ready_cb callback;
+
+ /**
+ * Closure given to the callback
+ */
+ void *cls;
+};
+
+
///**
// * Global sampler variable.
// */
@@ -269,7 +309,12 @@ sampler_mod_get_rand_peer (void *cls)
gpc->get_peer_task = NULL;
gpc->notify_ctx = NULL;
- sampler = gpc->req_handle->sampler;
+ GNUNET_assert ( (NULL != gpc->req_handle) ||
+ (NULL != gpc->req_single_info_handle) );
+ if (NULL != gpc->req_handle)
+ sampler = gpc->req_handle->sampler;
+ else
+ sampler = gpc->req_single_info_handle->sampler;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
@@ -362,10 +407,19 @@ sampler_mod_get_rand_peer (void *cls)
RPS_sampler_elem_reinit (s_elem);
s_elem->last_client_request = GNUNET_TIME_absolute_get ();
- GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
- gpc->req_handle->gpc_tail,
- gpc);
- gpc->cont (gpc->cont_cls, gpc->id);
+ if (NULL != gpc->req_handle)
+ {
+ GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
+ gpc->req_handle->gpc_tail,
+ gpc);
+ }
+ else
+ {
+ GNUNET_CONTAINER_DLL_remove (gpc->req_single_info_handle->gpc_head,
+ gpc->req_single_info_handle->gpc_tail,
+ gpc);
+ }
+ gpc->cont (gpc->cont_cls, gpc->id, prob_observed_n, s_elem->num_peers);
GNUNET_free (gpc);
}
diff --git a/src/rps/rps-sampler_client.h b/src/rps/rps-sampler_client.h
index 1b425b754..680fabfda 100644
--- a/src/rps/rps-sampler_client.h
+++ b/src/rps/rps-sampler_client.h
@@ -40,6 +40,11 @@ struct RPS_Sampler;
*/
struct RPS_SamplerRequestHandle;
+/**
+ * Closure to _get_rand_peer_info()
+ */
+struct RPS_SamplerRequestHandleSingleInfo;
+
/**
* Get the size of the sampler.
@@ -108,8 +113,6 @@ RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
* @param sampler the sampler to get peers from.
* @param cb callback that will be called once the ids are ready.
* @param cls closure given to @a cb
- * @param for_client #GNUNET_YES if result is used for client,
- * #GNUNET_NO if used internally
* @param num_peers the number of peers requested
*/
struct RPS_SamplerRequestHandle *
@@ -118,6 +121,7 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
RPS_sampler_n_rand_peers_ready_cb cb,
void *cls);
+
/**
* Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
*
diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c
index f54de9014..adb69e1b5 100644
--- a/src/rps/rps-sampler_common.c
+++ b/src/rps/rps-sampler_common.c
@@ -116,6 +116,45 @@ struct RPS_SamplerRequestHandle
/**
+ * Closure to _get_rand_peer_info()
+ */
+struct RPS_SamplerRequestHandleSingleInfo
+{
+ /**
+ * DLL
+ */
+ struct RPS_SamplerRequestHandleSingleInfo *next;
+ struct RPS_SamplerRequestHandleSingleInfo *prev;
+
+ /**
+ * Pointer to the id
+ */
+ struct GNUNET_PeerIdentity *id;
+
+ /**
+ * Head and tail for the DLL to store the tasks for single requests
+ */
+ struct GetPeerCls *gpc_head;
+ struct GetPeerCls *gpc_tail;
+
+ /**
+ * Sampler.
+ */
+ struct RPS_Sampler *sampler;
+
+ /**
+ * Callback to be called when all ids are available.
+ */
+ RPS_sampler_sinlge_info_ready_cb callback;
+
+ /**
+ * Closure given to the callback
+ */
+ void *cls;
+};
+
+
+/**
* @brief Update the current estimate of the network size stored at the sampler
*
* Used for computing the condition when to return elements to the client
@@ -415,12 +454,20 @@ sampler_empty (struct RPS_Sampler *sampler)
/**
* Callback to _get_rand_peer() used by _get_n_rand_peers().
*
+ * Implements #RPS_sampler_rand_peer_ready_cont
+ *
* Checks whether all n peers are available. If they are,
* give those back.
+ * @param cls Closure
+ * @param id Peer ID
+ * @param probability The probability with which this sampler has seen all ids
+ * @param num_observed How many ids this sampler has observed
*/
static void
check_n_peers_ready (void *cls,
- const struct GNUNET_PeerIdentity *id)
+ const struct GNUNET_PeerIdentity *id,
+ double probability,
+ uint32_t num_observed)
{
struct RPS_SamplerRequestHandle *req_handle = cls;
(void) id;
@@ -428,6 +475,8 @@ check_n_peers_ready (void *cls,
struct GNUNET_PeerIdentity *peers;
uint32_t num_peers;
void *cb_cls;
+ (void) probability;
+ (void) num_observed;
req_handle->cur_num_peers++;
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -460,6 +509,53 @@ check_n_peers_ready (void *cls,
/**
+ * Callback to _get_rand_peer() used by _get_rand_peer_info().
+ *
+ * Implements #RPS_sampler_rand_peer_ready_cont
+ *
+ * @param cls Closure
+ * @param id Peer ID
+ * @param probability The probability with which this sampler has seen all ids
+ * @param num_observed How many ids this sampler has observed
+ */
+static void
+check_peer_info_ready (void *cls,
+ const struct GNUNET_PeerIdentity *id,
+ double probability,
+ uint32_t num_observed)
+{
+ struct RPS_SamplerRequestHandleSingleInfo *req_handle = cls;
+ (void) id;
+ RPS_sampler_sinlge_info_ready_cb tmp_cb;
+ struct GNUNET_PeerIdentity *peer;
+ void *cb_cls;
+ (void) probability;
+ (void) num_observed;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got single peer with additional info\n");
+
+ GNUNET_assert (NULL != req_handle->callback);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "returning single peer with info to the client\n");
+
+ /* Copy pointers and peers temporarily as they
+ * might be deleted from within the callback */
+ tmp_cb = req_handle->callback;
+ peer = GNUNET_new (struct GNUNET_PeerIdentity);
+ GNUNET_memcpy (peer,
+ req_handle->id,
+ sizeof (struct GNUNET_PeerIdentity));
+ cb_cls = req_handle->cls;
+ RPS_sampler_request_single_info_cancel (req_handle);
+ req_handle = NULL;
+ tmp_cb (peer, cb_cls, probability, num_observed);
+ GNUNET_free (peer);
+}
+
+
+/**
* Get n random peers out of the sampled peers.
*
* We might want to reinitialise this sampler after giving the
@@ -469,8 +565,6 @@ check_n_peers_ready (void *cls,
* @param sampler the sampler to get peers from.
* @param cb callback that will be called once the ids are ready.
* @param cls closure given to @a cb
- * @param for_client #GNUNET_YES if result is used for client,
- * #GNUNET_NO if used internally
* @param num_peers the number of peers requested
*/
struct RPS_SamplerRequestHandle *
@@ -506,6 +600,7 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
{
gpc = GNUNET_new (struct GetPeerCls);
gpc->req_handle = req_handle;
+ gpc->req_single_info_handle = NULL;
gpc->cont = check_n_peers_ready;
gpc->cont_cls = req_handle;
gpc->id = &req_handle->ids[i];
@@ -515,11 +610,56 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
gpc);
// maybe add a little delay
gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
- gpc);
+ gpc);
}
return req_handle;
}
+
+/**
+ * Get one random peer with additional information.
+ *
+ * @param sampler the sampler to get peers from.
+ * @param cb callback that will be called once the ids are ready.
+ * @param cls closure given to @a cb
+ */
+struct RPS_SamplerRequestHandleSingleInfo *
+RPS_sampler_get_rand_peer_info (struct RPS_Sampler *sampler,
+ RPS_sampler_sinlge_info_ready_cb cb,
+ void *cls)
+{
+ struct RPS_SamplerRequestHandleSingleInfo *req_handle;
+ struct GetPeerCls *gpc;
+
+ GNUNET_assert (0 != sampler->sampler_size);
+
+ // TODO check if we have too much (distinct) sampled peers
+ req_handle = GNUNET_new (struct RPS_SamplerRequestHandleSingleInfo);
+ req_handle->id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
+ req_handle->sampler = sampler;
+ req_handle->callback = cb;
+ req_handle->cls = cls;
+ GNUNET_CONTAINER_DLL_insert (sampler->req_handle_single_head,
+ sampler->req_handle_single_tail,
+ req_handle);
+
+ gpc = GNUNET_new (struct GetPeerCls);
+ gpc->req_handle = NULL;
+ gpc->req_single_info_handle = req_handle;
+ gpc->cont = check_peer_info_ready;
+ gpc->cont_cls = req_handle;
+ gpc->id = req_handle->id;
+
+ GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
+ req_handle->gpc_tail,
+ gpc);
+ // maybe add a little delay
+ gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
+ gpc);
+ return req_handle;
+}
+
+
/**
* Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
*
@@ -559,6 +699,45 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
/**
+ * Cancle a request issued through #RPS_sampler_sinlge_info_ready_cb.
+ *
+ * @param req_handle the handle to the request
+ */
+void
+RPS_sampler_request_single_info_cancel (
+ struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle)
+{
+ struct GetPeerCls *i;
+
+ while (NULL != (i = req_single_info_handle->gpc_head) )
+ {
+ GNUNET_CONTAINER_DLL_remove (req_single_info_handle->gpc_head,
+ req_single_info_handle->gpc_tail,
+ i);
+ if (NULL != i->get_peer_task)
+ {
+ GNUNET_SCHEDULER_cancel (i->get_peer_task);
+ }
+ if (NULL != i->notify_ctx)
+ {
+ GNUNET_CONTAINER_DLL_remove (req_single_info_handle->sampler->notify_ctx_head,
+ req_single_info_handle->sampler->notify_ctx_tail,
+ i->notify_ctx);
+ GNUNET_free (i->notify_ctx);
+ i->notify_ctx = NULL;
+ }
+ GNUNET_free (i);
+ }
+ GNUNET_free (req_single_info_handle->id);
+ req_single_info_handle->id = NULL;
+ GNUNET_CONTAINER_DLL_remove (req_single_info_handle->sampler->req_handle_single_head,
+ req_single_info_handle->sampler->req_handle_single_tail,
+ req_single_info_handle);
+ GNUNET_free (req_single_info_handle);
+}
+
+
+/**
* Cleans the sampler.
*/
void
diff --git a/src/rps/rps-sampler_common.h b/src/rps/rps-sampler_common.h
index 1abe43720..321efaf1e 100644
--- a/src/rps/rps-sampler_common.h
+++ b/src/rps/rps-sampler_common.h
@@ -44,10 +44,14 @@
*
* @param cls the closure given alongside this function.
* @param id the PeerID that was returned
+ * @param probability The probability with which this sampler has seen all ids
+ * @param num_observed How many ids this sampler has observed
*/
typedef void
(*RPS_sampler_rand_peer_ready_cont) (void *cls,
- const struct GNUNET_PeerIdentity *id);
+ const struct GNUNET_PeerIdentity *id,
+ double probability,
+ uint32_t num_observed);
/**
@@ -72,6 +76,22 @@ typedef void
/**
+ * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
+ *
+ * @param cls the closure given alongside this function.
+ * @param probability Probability with which all IDs have been observed
+ * @param num_observed Number of observed IDs
+ * @param ids the PeerIDs that were returned
+ * to be freed
+ */
+ typedef void
+(*RPS_sampler_sinlge_info_ready_cb) (const struct GNUNET_PeerIdentity *ids,
+ void *cls,
+ double probability,
+ uint32_t num_observed);
+
+
+/**
* @brief Callback called each time a new peer was put into the sampler
*
* @param cls A possibly given closure
@@ -97,6 +117,11 @@ struct GetPeerCls
struct RPS_SamplerRequestHandle *req_handle;
/**
+ * The #RPS_SamplerRequestHandleSingleInfo this single request belongs to.
+ */
+ struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle;
+
+ /**
* The task for this function.
*/
struct GNUNET_SCHEDULER_Task *get_peer_task;
@@ -177,6 +202,12 @@ struct RPS_Sampler
struct RPS_SamplerRequestHandle *req_handle_head;
struct RPS_SamplerRequestHandle *req_handle_tail;
+ /**
+ * Head and tail for the DLL to store the #RPS_SamplerRequestHandleSingleInfo
+ */
+ struct RPS_SamplerRequestHandleSingleInfo *req_handle_single_head;
+ struct RPS_SamplerRequestHandleSingleInfo *req_handle_single_tail;
+
struct SamplerNotifyUpdateCTX *notify_ctx_head;
struct SamplerNotifyUpdateCTX *notify_ctx_tail;
};
@@ -306,6 +337,19 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
/**
+ * Get one random peer with additional information.
+ *
+ * @param sampler the sampler to get peers from.
+ * @param cb callback that will be called once the ids are ready.
+ * @param cls closure given to @a cb
+ */
+struct RPS_SamplerRequestHandleSingleInfo *
+RPS_sampler_get_rand_peer_info (struct RPS_Sampler *sampler,
+ RPS_sampler_sinlge_info_ready_cb cb,
+ void *cls);
+
+
+/**
* Counts how many Samplers currently hold a given PeerID.
*
* @param sampler the sampler to count ids in.
@@ -328,6 +372,16 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle);
/**
+ * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
+ *
+ * @param req_handle the handle to the request
+ */
+void
+RPS_sampler_request_single_info_cancel (
+ struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle);
+
+
+/**
* Cleans the sampler.
*/
void
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 7a3adfa94..83dff27e8 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -128,6 +128,16 @@ struct GNUNET_RPS_Handle
struct GNUNET_RPS_Request_Handle *rh_tail;
/**
+ * @brief Pointer to the head element in DLL of single request handles
+ */
+ struct GNUNET_RPS_Request_Handle_Single_Info *rhs_head;
+
+ /**
+ * @brief Pointer to the tail element in DLL of single request handles
+ */
+ struct GNUNET_RPS_Request_Handle_Single_Info *rhs_tail;
+
+ /**
* @brief The desired probability with which we want to have observed all
* peers.
*/
@@ -197,6 +207,54 @@ struct GNUNET_RPS_Request_Handle
/**
+ * Handler for a single request from a client.
+ */
+struct GNUNET_RPS_Request_Handle_Single_Info
+{
+ /**
+ * The client issuing the request.
+ */
+ struct GNUNET_RPS_Handle *rps_handle;
+
+ /**
+ * @brief The Sampler for the client request
+ */
+ struct RPS_Sampler *sampler;
+
+ /**
+ * @brief Request handle of the request to the sampler - needed to cancel the request
+ */
+ struct RPS_SamplerRequestHandleSingleInfo *sampler_rh;
+
+ /**
+ * @brief Request handle of the request of the biased stream of peers -
+ * needed to cancel the request
+ */
+ struct GNUNET_RPS_StreamRequestHandle *srh;
+
+ /**
+ * The callback to be called when we receive an answer.
+ */
+ GNUNET_RPS_NotifyReadySingleInfoCB ready_cb;
+
+ /**
+ * The closure for the callback.
+ */
+ void *ready_cb_cls;
+
+ /**
+ * @brief Pointer to next element in DLL
+ */
+ struct GNUNET_RPS_Request_Handle_Single_Info *next;
+
+ /**
+ * @brief Pointer to previous element in DLL
+ */
+ struct GNUNET_RPS_Request_Handle_Single_Info *prev;
+};
+
+
+/**
* Struct used to pack the callback, its closure (provided by the caller)
* and the connection handler to the service to pass it to a callback function.
*/
@@ -309,6 +367,36 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
/**
+ * @brief Called once the sampler has collected the requested peer.
+ *
+ * Calls the callback provided by the client with the corresponding cls.
+ *
+ * @param peers The array of @a num_peers that has been returned.
+ * @param num_peers The number of peers that have been returned
+ * @param cls The #GNUNET_RPS_Request_Handle
+ * @param probability Probability with which all IDs have been observed
+ * @param num_observed Number of observed IDs
+ */
+static void
+peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers,
+ void *cls,
+ double probability,
+ uint32_t num_observed)
+{
+ struct GNUNET_RPS_Request_Handle *rh = cls;
+ (void) probability;
+ (void) num_observed;
+ uint32_t num_peers = 1;
+
+ rh->sampler_rh = NULL;
+ rh->ready_cb (rh->ready_cb_cls,
+ num_peers,
+ peers);
+ GNUNET_RPS_request_cancel (rh);
+}
+
+
+/**
* @brief Callback to collect the peers from the biased stream and put those
* into the sampler.
*
@@ -632,15 +720,15 @@ mq_error_handler (void *cls,
*/
static void
hash_from_share_val (const char *share_val,
- struct GNUNET_HashCode *hash)
+ struct GNUNET_HashCode *hash)
{
GNUNET_CRYPTO_kdf (hash,
- sizeof (struct GNUNET_HashCode),
- "rps",
- strlen ("rps"),
- share_val,
- strlen (share_val),
- NULL, 0);
+ sizeof (struct GNUNET_HashCode),
+ "rps",
+ strlen ("rps"),
+ share_val,
+ strlen (share_val),
+ NULL, 0);
}
@@ -672,6 +760,13 @@ nse_cb (void *cls,
RPS_sampler_update_with_nw_size (rh_iter->sampler,
GNUNET_NSE_log_estimate_to_n (logestimate));
}
+ for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
+ NULL != rhs_iter && NULL != rhs_iter->next;
+ rhs_iter = rhs_iter->next)
+ {
+ RPS_sampler_update_with_nw_size (rhs_iter->sampler,
+ GNUNET_NSE_log_estimate_to_n (logestimate));
+ }
}
@@ -857,6 +952,48 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
/**
+ * Request one random peer, getting additional information.
+ *
+ * @param rps_handle handle to the rps service
+ * @param ready_cb the callback called when the peers are available
+ * @param cls closure given to the callback
+ * @return a handle to cancel this request
+ */
+struct GNUNET_RPS_Request_Handle_Single_Info *
+GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle,
+ GNUNET_RPS_NotifyReadySingleInfoCB ready_cb,
+ void *cls)
+{
+ struct GNUNET_RPS_Request_Handle_Single_Info *rhs;
+ uint32_t num_req_peers = 1;
+
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Client requested peer with additional info\n");
+ rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info);
+ rhs->rps_handle = rps_handle;
+ rhs->sampler = RPS_sampler_mod_init (num_req_peers,
+ GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
+ RPS_sampler_set_desired_probability (rhs->sampler,
+ rps_handle->desired_probability);
+ RPS_sampler_set_deficiency_factor (rhs->sampler,
+ rps_handle->deficiency_factor);
+ rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler,
+ peer_info_ready_cb,
+ rhs);
+ rhs->srh = GNUNET_RPS_stream_request (rps_handle,
+ collect_peers_cb,
+ rhs); /* cls */
+ rhs->ready_cb = ready_cb;
+ rhs->ready_cb_cls = cls;
+ GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head,
+ rps_handle->rhs_tail,
+ rhs);
+
+ return rhs;
+}
+
+
+/**
* Seed rps service with peerIDs.
*
* @param h handle to the rps service
@@ -1047,6 +1184,37 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
/**
+ * Cancle an issued single info request.
+ *
+ * @param rhs request handle of request to cancle
+ */
+void
+GNUNET_RPS_request_single_info_cancel (
+ struct GNUNET_RPS_Request_Handle_Single_Info *rhs)
+{
+ struct GNUNET_RPS_Handle *h;
+
+ h = rhs->rps_handle;
+ GNUNET_assert (NULL != rhs);
+ GNUNET_assert (NULL != rhs->srh);
+ GNUNET_assert (h == rhs->srh->rps_handle);
+ GNUNET_RPS_stream_cancel (rhs->srh);
+ rhs->srh = NULL;
+ if (NULL == h->stream_requests_head) cancel_stream(h);
+ if (NULL != rhs->sampler_rh)
+ {
+ RPS_sampler_request_single_info_cancel (rhs->sampler_rh);
+ }
+ RPS_sampler_destroy (rhs->sampler);
+ rhs->sampler = NULL;
+ GNUNET_CONTAINER_DLL_remove (h->rhs_head,
+ h->rhs_tail,
+ rhs);
+ GNUNET_free (rhs);
+}
+
+
+/**
* Disconnect from the rps service
*
* @param h the handle to the rps service
@@ -1079,6 +1247,17 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
GNUNET_RPS_request_cancel (rh_iter);
}
}
+ if (NULL != h->rhs_head)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Not all requests were cancelled!\n");
+ for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
+ h->rhs_head != NULL;
+ rhs_iter = h->rhs_head)
+ {
+ GNUNET_RPS_request_single_info_cancel (rhs_iter);
+ }
+ }
if (NULL != srh_callback_peers)
{
GNUNET_free (srh_callback_peers);