summaryrefslogtreecommitdiff
path: root/src/rps/gnunet-service-rps_sampler.c
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2018-09-20 22:34:18 +0200
committerJulius Bünger <buenger@mytum.de>2018-09-20 22:35:30 +0200
commit0ce2284452e86e3fb6991e573bed2d37b0be8331 (patch)
treea6245f1d30a37db149e23c26bb3caa7bd773ea8d /src/rps/gnunet-service-rps_sampler.c
parent33d8b5a803bf931822e3591a4d7387da3aedcc63 (diff)
Move from timer-based to callback-based updates in sampler
Diffstat (limited to 'src/rps/gnunet-service-rps_sampler.c')
-rw-r--r--src/rps/gnunet-service-rps_sampler.c164
1 files changed, 137 insertions, 27 deletions
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index 0de15bbc0..ff4bc9e42 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -64,6 +64,43 @@ typedef void
/**
+ * @brief Callback called each time a new peer was put into the sampler
+ *
+ * @param cls A possibly given closure
+ */
+typedef void
+(*SamplerNotifyUpdateCB) (void *cls);
+
+/**
+ * @brief Context for a callback. Contains callback and closure.
+ *
+ * Meant to be an entry in an DLL.
+ */
+struct SamplerNotifyUpdateCTX
+{
+ /**
+ * @brief The Callback to call on updates
+ */
+ SamplerNotifyUpdateCB notify_cb;
+
+ /**
+ * @brief The according closure.
+ */
+ void *cls;
+
+ /**
+ * @brief Next element in DLL.
+ */
+ struct SamplerNotifyUpdateCTX *next;
+
+ /**
+ * @brief Previous element in DLL.
+ */
+ struct SamplerNotifyUpdateCTX *prev;
+};
+
+
+/**
* Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer
*/
struct GetPeerCls
@@ -85,6 +122,11 @@ struct GetPeerCls
struct GNUNET_SCHEDULER_Task *get_peer_task;
/**
+ * @brief Context to the given callback.
+ */
+ struct SamplerNotifyUpdateCTX *notify_ctx;
+
+ /**
* The callback
*/
RPS_sampler_rand_peer_ready_cont cont;
@@ -164,6 +206,8 @@ struct RPS_Sampler
struct RPS_SamplerRequestHandle *req_handle_head;
struct RPS_SamplerRequestHandle *req_handle_tail;
+ struct SamplerNotifyUpdateCTX *notify_ctx_head;
+ struct SamplerNotifyUpdateCTX *notify_ctx_tail;
#ifdef TO_FILE
/**
* File name to log to
@@ -248,6 +292,52 @@ static uint32_t client_get_index;
/**
+ * @brief Add a callback that will be called when the next peer is inserted
+ * into the sampler
+ *
+ * @param sampler The sampler on which update it will be called
+ * @param notify_cb The callback
+ * @param cls Closure given to the callback
+ *
+ * @return The context containing callback and closure
+ */
+struct SamplerNotifyUpdateCTX *
+sampler_notify_on_update (struct RPS_Sampler *sampler,
+ SamplerNotifyUpdateCB notify_cb,
+ void *cls)
+{
+ struct SamplerNotifyUpdateCTX *notify_ctx;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Inserting new context for notification\n");
+ notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX);
+ notify_ctx->notify_cb = notify_cb;
+ notify_ctx->cls = cls;
+ if (NULL != sampler->notify_ctx_head)
+ {
+ for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
+ NULL != notify_iter->next;
+ notify_iter = notify_iter->next)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Pre: Context\n");
+ }
+ }
+ GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head,
+ sampler->notify_ctx_tail,
+ notify_ctx);
+ for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
+ NULL != notify_iter;
+ notify_iter = notify_iter->next)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Post: Context\n");
+ }
+ return notify_ctx;
+}
+
+
+/**
* Callback to _get_rand_peer() used by _get_n_rand_peers().
*
* Checks whether all n peers are available. If they are,
@@ -469,7 +559,7 @@ RPS_sampler_mod_init (size_t init_size,
/**
- * A fuction to update every sampler in the given list
+ * Update every sampler element of this sampler with given peer
*
* @param sampler the sampler to update.
* @param id the PeerID that is put in the sampler
@@ -478,17 +568,33 @@ RPS_sampler_mod_init (size_t init_size,
RPS_sampler_update (struct RPS_Sampler *sampler,
const struct GNUNET_PeerIdentity *id)
{
- uint32_t i;
+ struct SamplerNotifyUpdateCTX *tmp_notify_head;
+ struct SamplerNotifyUpdateCTX *tmp_notify_tail;
to_file (sampler->file_name,
"Got %s",
GNUNET_i2s_full (id));
- for (i = 0; i < sampler->sampler_size; i++)
+ for (uint32_t i = 0; i < sampler->sampler_size; i++)
{
RPS_sampler_elem_next (sampler->sampler_elements[i],
id);
}
+ tmp_notify_head = sampler->notify_ctx_head;
+ tmp_notify_tail = sampler->notify_ctx_tail;
+ sampler->notify_ctx_head = NULL;
+ sampler->notify_ctx_tail = NULL;
+ for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head;
+ NULL != tmp_notify_head;
+ notify_iter = tmp_notify_head)
+ {
+ GNUNET_assert (NULL != notify_iter->notify_cb);
+ GNUNET_CONTAINER_DLL_remove (tmp_notify_head,
+ tmp_notify_tail,
+ notify_iter);
+ notify_iter->notify_cb (notify_iter->cls);
+ GNUNET_free (notify_iter);
+ }
}
@@ -535,6 +641,7 @@ sampler_get_rand_peer (void *cls)
struct RPS_Sampler *sampler;
gpc->get_peer_task = NULL;
+ gpc->notify_ctx = NULL;
sampler = gpc->req_handle->sampler;
/**;
@@ -549,15 +656,10 @@ sampler_get_rand_peer (void *cls)
//LOG (GNUNET_ERROR_TYPE_DEBUG,
// "Not returning randomly selected, empty PeerID. - Rescheduling.\n");
- /* FIXME no active wait - get notified, when new id arrives?
- * Might also be a freshly emptied one. Others might still contain ids.
- * Counter?
- */
- gpc->get_peer_task =
- GNUNET_SCHEDULER_add_delayed (
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
- &sampler_get_rand_peer,
- cls);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
return;
}
@@ -585,6 +687,7 @@ sampler_mod_get_rand_peer (void *cls)
struct RPS_Sampler *sampler;
gpc->get_peer_task = NULL;
+ gpc->notify_ctx = NULL;
sampler = gpc->req_handle->sampler;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
@@ -600,11 +703,11 @@ sampler_mod_get_rand_peer (void *cls)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sampler_mod element empty, rescheduling.\n");
- GNUNET_assert (NULL == gpc->get_peer_task);
- gpc->get_peer_task =
- GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
- &sampler_mod_get_rand_peer,
- cls);
+ GNUNET_assert (NULL == gpc->notify_ctx);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
return;
}
@@ -626,11 +729,11 @@ sampler_mod_get_rand_peer (void *cls)
// sampler->max_round_interval);
// add a little delay
/* Schedule it one round later */
- GNUNET_assert (NULL == gpc->get_peer_task);
- gpc->get_peer_task =
- GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
- &sampler_mod_get_rand_peer,
- cls);
+ GNUNET_assert (NULL == gpc->notify_ctx);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
return;
}
}
@@ -638,11 +741,11 @@ sampler_mod_get_rand_peer (void *cls)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"This s_elem saw less than two peers -- scheduling for later\n");
- GNUNET_assert (NULL == gpc->get_peer_task);
- gpc->get_peer_task =
- GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
- &sampler_mod_get_rand_peer,
- cls);
+ GNUNET_assert (NULL == gpc->notify_ctx);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
return;
}
/* More reasons to wait could be added here */
@@ -747,6 +850,13 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
{
GNUNET_SCHEDULER_cancel (i->get_peer_task);
}
+ if (NULL != i->notify_ctx)
+ {
+ GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head,
+ req_handle->sampler->notify_ctx_tail,
+ i->notify_ctx);
+ GNUNET_free (i->notify_ctx);
+ }
GNUNET_free (i);
}
GNUNET_free (req_handle->ids);