summaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/gnunet-service-rps_sampler.c191
-rw-r--r--src/rps/gnunet-service-rps_sampler.h7
2 files changed, 87 insertions, 111 deletions
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index 419110440..92826d875 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -32,7 +32,7 @@
#include <math.h>
#include <inttypes.h>
-#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
+#define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__)
// multiple 'clients'?
@@ -186,7 +186,7 @@ struct NRandPeersReadyCls
* @param id the PeerID that was returned
*/
typedef void
-(*RPS_sampler_rand_peer_ready_cb) (void *cls,
+(*RPS_sampler_rand_peer_ready_cont) (void *cls,
const struct GNUNET_PeerIdentity *id);
/**
@@ -194,6 +194,10 @@ typedef void
*/
struct GetPeerCls
{
+ /** DLL */
+ struct GetPeerCls *next;
+ struct GetPeerCls *prev;
+
/**
* The task for this function.
*/
@@ -202,12 +206,12 @@ struct GetPeerCls
/**
* The callback
*/
- RPS_sampler_rand_peer_ready_cb cb;
+ RPS_sampler_rand_peer_ready_cont cont;
/**
* The closure to the callback
*/
- void *cb_cls;
+ void *cont_cls;
/**
* The address of the id to be stored at
@@ -215,11 +219,6 @@ struct GetPeerCls
struct GNUNET_PeerIdentity *id;
};
-/**
- * Multihashmap that keeps track of all get_peer_tasks that are still scheduled.
- */
-struct GNUNET_CONTAINER_MultiHashMap *get_peer_tasks;
-
/**
* Global sampler variable.
@@ -248,6 +247,11 @@ static size_t max_size;
static uint32_t client_get_index;
+/** FIXME document */
+struct GetPeerCls *gpc_head;
+struct GetPeerCls *gpc_tail;
+
+
/**
* Callback to _get_rand_peer() used by _get_n_rand_peers().
*
@@ -262,16 +266,17 @@ check_n_peers_ready (void *cls,
n_peers_cls = (struct NRandPeersReadyCls *) cls;
+ n_peers_cls->cur_num_peers++;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n",
+ "Got %" PRIX32 ". of %" PRIX32 " peers\n",
n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
- if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers)
+ if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers)
{ /* All peers are ready -- return those to the client */
GNUNET_assert (NULL != n_peers_cls->callback);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: returning %" PRIX32 " peers to the client\n",
+ "returning %" PRIX32 " peers to the client\n",
n_peers_cls->num_peers);
n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
@@ -319,7 +324,7 @@ RPS_sampler_elem_create (void)
s = GNUNET_new (struct RPS_SamplerElement);
RPS_sampler_elem_reinit (s);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "initialised with empty PeerID\n");
return s;
}
@@ -339,9 +344,9 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe
if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n",
GNUNET_i2s (other));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Have already PeerID %s\n",
GNUNET_i2s (&(s_elem->peer_id)));
}
else
@@ -353,7 +358,7 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe
if ( EMPTY == s_elem->is_empty )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (was empty previously).\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerID %s; Simply accepting (was empty previously).\n",
GNUNET_i2s(other));
s_elem->peer_id = *other;
s_elem->peer_id_hash = other_hash;
@@ -365,14 +370,14 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe
}
else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n",
GNUNET_i2s (other));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n",
GNUNET_i2s (&s_elem->peer_id));
if ( NULL != sampler->remove_cb )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing old PeerID %s with the remove callback.\n",
GNUNET_i2s (&s_elem->peer_id));
sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id);
}
@@ -382,7 +387,7 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe
if ( NULL != sampler->insert_cb )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Inserting new PeerID %s with the insert callback.\n",
GNUNET_i2s (&s_elem->peer_id));
sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
}
@@ -391,9 +396,9 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe
}
else
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n",
GNUNET_i2s(other));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Keeping old PeerID %s\n",
GNUNET_i2s(&s_elem->peer_id));
}
}
@@ -436,15 +441,15 @@ sampler_resize (unsigned int new_size)
&sampler->sampler_elements[new_size],
(old_size - new_size) * sizeof (struct RPS_SamplerElement *));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n", old_size, new_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Shrinking sampler %d -> %d\n", old_size, new_size);
GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: sampler->sampler_elements now points to %p\n",
+ "sampler->sampler_elements now points to %p\n",
sampler->sampler_elements);
for (i = 0 ; i < old_size - new_size ; i++)
{/* Remove unneeded rest */
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX32 ". sampler\n", i);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX32 ". sampler\n", i);
if (NULL != sampler->remove_cb)
sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id);
GNUNET_free (rem_list[i]);
@@ -453,10 +458,10 @@ sampler_resize (unsigned int new_size)
}
else if (old_size < new_size)
{ /* Growing */
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n", old_size, new_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Growing sampler %d -> %d\n", old_size, new_size);
GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: sampler->sampler_elements now points to %p\n",
+ "sampler->sampler_elements now points to %p\n",
sampler->sampler_elements);
for ( i = old_size ; i < new_size ; i++ )
@@ -465,18 +470,18 @@ sampler_resize (unsigned int new_size)
if (NULL != sampler->insert_cb)
sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
+ "Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
}
}
else
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
return;
}
GNUNET_assert (sampler->sampler_size == new_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished growing/shrinking.\n"); // remove
}
@@ -498,8 +503,8 @@ RPS_sampler_resize (unsigned int new_size)
*
* @param new_size the new size of the sampler
*/
-void
-RPS_sampler_empty ()
+static void
+sampler_empty ()
{
sampler_resize (0);
}
@@ -537,7 +542,6 @@ RPS_sampler_init (size_t init_size,
sampler->insert_cls = ins_cls;
sampler->remove_cb = rem_cb;
sampler->remove_cls = rem_cls;
- get_peer_tasks = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
//sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
//GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
RPS_sampler_resize (init_size);
@@ -581,7 +585,7 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
{
if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n");
RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
}
}
@@ -595,14 +599,16 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
* corrsponding peer to the client.
* Only used internally
*/
- void
-RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static void
+sampler_get_rand_peer2 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct GetPeerCls *gpc;
+ struct GetPeerCls *gpc = (struct GetPeerCls *) cls;
uint32_t r_index;
- struct GNUNET_HashCode *hash;
- gpc = (struct GetPeerCls *) cls;
+ gpc->get_peer_task = NULL;
+ GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc);
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
/**;
* Choose the r_index of the peer we want to return
@@ -616,20 +622,15 @@ RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext
gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
GNUNET_TIME_UNIT_SECONDS,
.1),
- &RPS_sampler_get_rand_peer_,
+ &sampler_get_rand_peer2,
cls);
return;
}
*gpc->id = sampler->sampler_elements[r_index]->peer_id;
- hash = GNUNET_new (struct GNUNET_HashCode);
- GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
- if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
- LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
- GNUNET_free (gpc->get_peer_task);
-
- gpc->cb (gpc->cb_cls, gpc->id);
+ gpc->cont (gpc->cont_cls, gpc->id);
+ GNUNET_free (gpc);
}
@@ -641,20 +642,22 @@ RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext
*
* @return a random PeerID of the PeerIDs previously put into the sampler.
*/
- void
-RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static void
+sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct GetPeerCls *gpc;
+ struct GetPeerCls *gpc = (struct GetPeerCls *) cls;
struct GNUNET_PeerIdentity tmp_id;
struct RPS_SamplerElement *s_elem;
struct GNUNET_TIME_Relative last_request_diff;
- struct GNUNET_HashCode *hash;
uint32_t tmp_client_get_index;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n");
+ gpc->get_peer_task = NULL;
+ GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc);
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
- gpc = (struct GetPeerCls *) cls;
- hash = GNUNET_new (struct GNUNET_HashCode);
/* Store the next #client_get_index to check whether we cycled over the whole list */
if (0 < client_get_index)
@@ -663,16 +666,17 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext
tmp_client_get_index = sampler->sampler_size - 1;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
+ "scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
tmp_client_get_index, sampler->sampler_size);
do
{ /* Get first non empty sampler */
if (tmp_client_get_index == client_get_index)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: reached tmp_index %" PRIX32 ".\n", client_get_index);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "reached tmp_index %" PRIX32 ".\n", client_get_index);
+ GNUNET_assert (NULL == gpc->get_peer_task);
gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
- &RPS_sampler_get_rand_peer,
+ &sampler_get_rand_peer,
cls);
return;
}
@@ -688,7 +692,7 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext
else
client_get_index++;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "incremented index to %" PRIX32 ".\n", client_get_index);
} while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
s_elem = sampler->sampler_elements[client_get_index];
@@ -703,28 +707,25 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext
if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
+ "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
///* How many time remains untile the next round has started? */
//inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff,
// sampler->max_round_interval);
// add a little delay
/* Schedule it one round later */
+ GNUNET_assert (NULL == gpc->get_peer_task);
gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
- &RPS_sampler_get_rand_peer,
+ &sampler_get_rand_peer,
cls);
return;
}
// TODO add other reasons to wait here
}
- GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
- if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
- LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
- GNUNET_free (gpc->get_peer_task);
-
s_elem->last_client_request = GNUNET_TIME_absolute_get ();
- gpc->cb (gpc->cb_cls, gpc->id);
+ gpc->cont (gpc->cont_cls, gpc->id);
+ GNUNET_free (gpc);
}
@@ -743,19 +744,14 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext
*/
void
RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
- void *cls, uint32_t num_peers, int for_client)
+ void *cls, uint32_t num_peers, int for_client)
{
- GNUNET_assert (GNUNET_YES == for_client ||
- GNUNET_NO == for_client);
GNUNET_assert (0 != sampler->sampler_size);
// TODO check if we have too much (distinct) sampled peers
uint32_t i;
struct NRandPeersReadyCls *cb_cls;
struct GetPeerCls *gpc;
- struct GNUNET_HashCode *hash;
-
- hash = GNUNET_new (struct GNUNET_HashCode);
cb_cls = GNUNET_new (struct NRandPeersReadyCls);
cb_cls->num_peers = num_peers;
@@ -765,23 +761,24 @@ RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
cb_cls->cls = cls;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
+ "Scheduling requests for %" PRIX32 " peers\n", num_peers);
for ( i = 0 ; i < num_peers ; i++ )
{
gpc = GNUNET_new (struct GetPeerCls);
- gpc->cb = check_n_peers_ready;
- gpc->cb_cls = cb_cls;
+ gpc->cont = check_n_peers_ready;
+ gpc->cont_cls = cb_cls;
gpc->id = &cb_cls->ids[i];
// maybe add a little delay
if (GNUNET_YES == for_client)
- gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc);
+ gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer, gpc);
else if (GNUNET_NO == for_client)
- gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc);
- GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
- (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer2, gpc);
+ else
+ GNUNET_abort ();
+
+ GNUNET_CONTAINER_DLL_insert (gpc_head, gpc_tail, gpc);
}
}
@@ -811,35 +808,21 @@ RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
/**
- * Callback to iterate over the hashmap to cancle the get_peer_tasks.
- */
- int
-clear_get_peer_tasks (void *cls, const struct GNUNET_HashCode *key, void *value)
-{
- struct GNUNET_SCHEDULER_Task *task;
-
- task = (struct GNUNET_SCHEDULER_Task *) value;
- GNUNET_SCHEDULER_cancel (task);
-
- GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, key, value);
-
- return GNUNET_YES;
-}
-
-
-/**
* Cleans the sampler.
*/
void
RPS_sampler_destroy ()
{
- if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_iterate (get_peer_tasks,
- clear_get_peer_tasks,
- NULL))
- LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: iteration over hashmap was cancelled\n");
- GNUNET_CONTAINER_multihashmap_destroy (get_peer_tasks);
- RPS_sampler_resize (0);
- GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0);
+ struct GetPeerCls *i;
+
+ for (i = gpc_head; NULL != i; i = gpc_head)
+ {
+ GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, i);
+ GNUNET_SCHEDULER_cancel (i->get_peer_task);
+ GNUNET_free (i);
+ }
+
+ sampler_empty ();
}
/* end of gnunet-service-rps.c */
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h
index a4d1fa7d4..bc3994e36 100644
--- a/src/rps/gnunet-service-rps_sampler.h
+++ b/src/rps/gnunet-service-rps_sampler.h
@@ -84,13 +84,6 @@ RPS_sampler_resize (unsigned int new_size);
/**
- * Empty the sampler.
- */
-void
-RPS_sampler_empty ();
-
-
-/**
* Initialise a tuple of samplers.
*
* @param init_size the size the sampler is initialised with