/*
This file is part of GNUnet.
Copyright (C) 2012, 2013, 2014, 2017, 2018 GNUnet e.V.
GNUnet is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
SPDX-License-Identifier: AGPL3.0-or-later
*/
/**
* @file zonemaster/gnunet-service-zonemaster.c
* @brief publish records from namestore to GNUnet name system
* @author Christian Grothoff
*/
#include "platform.h"
#include
#include "gnunet_util_lib.h"
#include "gnunet_dht_service.h"
#include "gnunet_namestore_service.h"
#include "gnunet_namecache_service.h"
#include "gnunet_statistics_service.h"
#define LOG_STRERROR_FILE(kind, syscall, \
filename) GNUNET_log_from_strerror_file (kind, "util", \
syscall, \
filename)
/**
* How often should we (re)publish each record before
* it expires?
*/
#define PUBLISH_OPS_PER_EXPIRATION 4
/**
* How often do we measure the delta between desired zone
* iteration speed and actual speed, and tell statistics
* service about it?
*/
#define DELTA_INTERVAL 100
/**
* How many records do we fetch in one shot from the namestore?
*/
#define NS_BLOCK_SIZE 1000
/**
* How many open jobs (and with it maximum amount of pending DHT operations) do we allow at most
*/
#define JOB_QUEUE_LIMIT 5000
/**
* How many events may the namestore give us before it has to wait
* for us to keep up?
*/
#define NAMESTORE_MONITOR_QUEUE_LIMIT 5
/**
* The initial interval in milliseconds btween puts in
* a zone iteration
*/
#define INITIAL_ZONE_ITERATION_INTERVAL GNUNET_TIME_UNIT_MILLISECONDS
/**
* The upper bound for the zone iteration interval
* (per record).
*/
#define MAXIMUM_ZONE_ITERATION_INTERVAL GNUNET_TIME_relative_multiply ( \
GNUNET_TIME_UNIT_MINUTES, 15)
/**
* The factor the current zone iteration interval is divided by for each
* additional new record
*/
#define LATE_ITERATION_SPEEDUP_FACTOR 2
/**
* What replication level do we use for DHT PUT operations?
*/
#define DHT_GNS_REPLICATION_LEVEL 5
/**
* Our workers
*/
static pthread_t * worker;
/**
* Lock for the sign jobs queue.
*/
static pthread_mutex_t sign_jobs_lock;
/**
* Lock for the DHT put jobs queue.
*/
static pthread_mutex_t sign_results_lock;
/**
* Wait condition on new sign jobs
*/
static pthread_cond_t sign_jobs_cond;
/**
* For threads to know we are shutting down
*/
static int in_shutdown = GNUNET_NO;
/**
* Monitor halted?
*/
static int monitor_halted = GNUNET_NO;
/**
* Our notification pipe
*/
static struct GNUNET_DISK_PipeHandle *notification_pipe;
/**
* Pipe read task
*/
static struct GNUNET_SCHEDULER_Task *pipe_read_task;
struct RecordPublicationJob
{
/**
* DLL
*/
struct RecordPublicationJob *next;
/**
* DLL
*/
struct RecordPublicationJob *prev;
/**
* The zone key to sign the block with
*/
struct GNUNET_CRYPTO_PrivateKey zone;
/**
* The block to sign
*/
struct GNUNET_GNSRECORD_Block *block;
/**
* The private block to sign, may point to block in case
* the public and private blocks are the same.
*/
struct GNUNET_GNSRECORD_Block *block_priv;
/**
* The size of the public block for the DHT put.
*/
size_t block_size;
/**
* The expiration time of the public block for the DHT put.
*/
struct GNUNET_TIME_Absolute expire_pub;
/**
* The label of the block needed for signing
*/
char *label;
/**
* Handle for the DHT PUT operation.
*/
struct GNUNET_DHT_PutHandle *ph;
/**
* When was this PUT initiated?
*/
struct GNUNET_TIME_Absolute start_date;
};
/**
* The DLL for workers to retrieve open jobs that require
* signing of blocks.
*/
static struct RecordPublicationJob *sign_jobs_head;
/**
* See above
*/
static struct RecordPublicationJob *sign_jobs_tail;
/**
* The DLL for workers to place jobs that are signed.
*/
static struct RecordPublicationJob *sign_results_head;
/**
* See above
*/
static struct RecordPublicationJob *sign_results_tail;
/**
* The DLL for jobs currently in the process of being dispatched into the DHT.
*/
static struct RecordPublicationJob *dht_jobs_head;
/**
* See above
*/
static struct RecordPublicationJob *dht_jobs_tail;
/**
* Pending operation on the namecache.
*/
struct CacheOperation
{
/**
* Kept in a DLL.
*/
struct CacheOperation *prev;
/**
* Kept in a DLL.
*/
struct CacheOperation *next;
/**
* Handle to namecache queue.
*/
struct GNUNET_NAMECACHE_QueueEntry *qe;
};
/**
* Handle to the statistics service
*/
static struct GNUNET_STATISTICS_Handle *statistics;
/**
* Our handle to the DHT
*/
static struct GNUNET_DHT_Handle *dht_handle;
/**
* Our handle to the namestore service
*/
static struct GNUNET_NAMESTORE_Handle *namestore_handle;
/**
* Handle to monitor namestore changes to instant propagation.
*/
static struct GNUNET_NAMESTORE_ZoneMonitor *zmon;
/**
* Our handle to the namecache service
*/
static struct GNUNET_NAMECACHE_Handle *namecache;
/**
* Use the namecache? Doing so creates additional cryptographic
* operations whenever we touch a record.
*/
static int disable_namecache;
/**
* Handle to iterate over our authoritative zone in namestore
*/
static struct GNUNET_NAMESTORE_ZoneIterator *namestore_iter;
/**
* Number of entries in the job queue #jobs_head.
*/
static unsigned int job_queue_length;
/**
* Useful for zone update for DHT put
*/
static unsigned long long num_public_records;
/**
* Last seen record count
*/
static unsigned long long last_num_public_records;
/**
* Number of successful put operations performed in the current
* measurement cycle (as measured in #check_zone_namestore_next()).
*/
static unsigned long long put_cnt;
/**
* What is the frequency at which we currently would like
* to perform DHT puts (per record)? Calculated in
* update_velocity() from the #zone_publish_time_window()
* and the total number of record sets we have (so far)
* observed in the zone.
*/
static struct GNUNET_TIME_Relative target_iteration_velocity_per_record;
/**
* Minimum relative expiration time of records seem during the current
* zone iteration.
*/
static struct GNUNET_TIME_Relative min_relative_record_time;
/**
* Minimum relative expiration time of records seem during the last
* zone iteration.
*/
static struct GNUNET_TIME_Relative last_min_relative_record_time;
/**
* Default time window for zone iteration
*/
static struct GNUNET_TIME_Relative zone_publish_time_window_default;
/**
* Time window for zone iteration, adjusted based on relative record
* expiration times in our zone.
*/
static struct GNUNET_TIME_Relative zone_publish_time_window;
/**
* When did we last start measuring the #DELTA_INTERVAL successful
* DHT puts? Used for velocity calculations.
*/
static struct GNUNET_TIME_Absolute last_put_100;
/**
* By how much should we try to increase our per-record iteration speed
* (over the desired speed calculated directly from the #put_interval)?
* Basically this value corresponds to the per-record CPU time overhead
* we have.
*/
static struct GNUNET_TIME_Relative sub_delta;
/**
* zone publish task
*/
static struct GNUNET_SCHEDULER_Task *zone_publish_task;
/**
* How many more values are left for the current query before we need
* to explicitly ask the namestore for more?
*/
static unsigned int ns_iteration_left;
/**
* #GNUNET_YES if zone has never been published before
*/
static int first_zone_iteration;
/**
* Optimize block insertion by caching map of private keys to
* public keys in memory?
*/
static int cache_keys;
/**
* Head of cop DLL.
*/
static struct CacheOperation *cop_head;
/**
* Tail of cop DLL.
*/
static struct CacheOperation *cop_tail;
static void
free_job (struct RecordPublicationJob *job)
{
if (job->block != job->block_priv)
GNUNET_free (job->block_priv);
GNUNET_free (job->block);
if (NULL != job->label)
GNUNET_free (job->label);
GNUNET_free (job);
}
/**
* Task run during shutdown.
*
* @param cls unused
* @param tc unused
*/
static void
shutdown_task (void *cls)
{
struct CacheOperation *cop;
struct RecordPublicationJob *job;
(void) cls;
in_shutdown = GNUNET_YES;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Shutting down!\n");
if (NULL != notification_pipe)
GNUNET_DISK_pipe_close (notification_pipe);
if (NULL != pipe_read_task)
GNUNET_SCHEDULER_cancel (pipe_read_task);
while (NULL != (cop = cop_head))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Aborting incomplete namecache operation\n");
GNUNET_NAMECACHE_cancel (cop->qe);
GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop);
GNUNET_free (cop);
}
GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
while (NULL != (job = sign_jobs_head))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Removing incomplete jobs\n");
GNUNET_CONTAINER_DLL_remove (sign_jobs_head, sign_jobs_tail, job);
job_queue_length--;
free_job (job);
}
GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
while (NULL != (job = sign_results_head))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Removing incomplete jobs\n");
GNUNET_CONTAINER_DLL_remove (sign_results_head, sign_results_tail, job);
free_job (job);
}
GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
while (NULL != (job = dht_jobs_head))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Removing incomplete jobs\n");
GNUNET_CONTAINER_DLL_remove (dht_jobs_head, dht_jobs_tail, job);
if (NULL != job->ph)
GNUNET_DHT_put_cancel (job->ph);
free_job (job);
}
if (NULL != statistics)
{
GNUNET_STATISTICS_destroy (statistics,
GNUNET_NO);
statistics = NULL;
}
if (NULL != zone_publish_task)
{
GNUNET_SCHEDULER_cancel (zone_publish_task);
zone_publish_task = NULL;
}
if (NULL != namestore_iter)
{
GNUNET_NAMESTORE_zone_iteration_stop (namestore_iter);
namestore_iter = NULL;
}
if (NULL != zmon)
{
GNUNET_NAMESTORE_zone_monitor_stop (zmon);
zmon = NULL;
}
if (NULL != namestore_handle)
{
GNUNET_NAMESTORE_disconnect (namestore_handle);
namestore_handle = NULL;
}
if (NULL != namecache)
{
GNUNET_NAMECACHE_disconnect (namecache);
namecache = NULL;
}
if (NULL != dht_handle)
{
GNUNET_DHT_disconnect (dht_handle);
dht_handle = NULL;
}
}
/**
* Cache operation complete, clean up.
*
* @param cls the `struct CacheOperation`
* @param success success
* @param emsg error messages
*/
static void
finish_cache_operation (void *cls, int32_t success, const char *emsg)
{
struct CacheOperation *cop = cls;
if (NULL != emsg)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_ ("Failed to replicate block in namecache: %s\n"),
emsg);
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CACHE operation completed\n");
GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop);
GNUNET_free (cop);
}
/**
* Refresh the (encrypted) block in the namecache.
*
* @param zone_key private key of the zone
* @param name label for the records
* @param rd_count number of records
* @param rd records stored under the given @a name
*/
static void
refresh_block (const struct GNUNET_GNSRECORD_Block *block)
{
struct CacheOperation *cop;
if (GNUNET_YES == disable_namecache)
{
GNUNET_STATISTICS_update (statistics,
"Namecache updates skipped (NC disabled)",
1,
GNUNET_NO);
return;
}
GNUNET_assert (NULL != block);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Caching block in namecache\n");
GNUNET_STATISTICS_update (statistics,
"Namecache updates pushed",
1,
GNUNET_NO);
cop = GNUNET_new (struct CacheOperation);
GNUNET_CONTAINER_DLL_insert (cop_head, cop_tail, cop);
cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
block,
&finish_cache_operation,
cop);
}
/**
* Method called periodically that triggers iteration over authoritative records
*
* @param cls NULL
*/
static void
publish_zone_namestore_next (void *cls)
{
(void) cls;
zone_publish_task = NULL;
GNUNET_assert (NULL != namestore_iter);
GNUNET_assert (0 == ns_iteration_left);
ns_iteration_left = NS_BLOCK_SIZE;
GNUNET_NAMESTORE_zone_iterator_next (namestore_iter,
NS_BLOCK_SIZE);
}
/**
* Periodically iterate over our zone and store everything in dht
*
* @param cls NULL
*/
static void
publish_zone_dht_start (void *cls);
/**
* Calculate #target_iteration_velocity_per_record.
*/
static void
calculate_put_interval ()
{
if (0 == num_public_records)
{
/**
* If no records are known (startup) or none present
* we can safely set the interval to the value for a single
* record
*/target_iteration_velocity_per_record = zone_publish_time_window;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
"No records in namestore database.\n");
}
else
{
last_min_relative_record_time
= GNUNET_TIME_relative_min (last_min_relative_record_time,
min_relative_record_time);
zone_publish_time_window
= GNUNET_TIME_relative_min (GNUNET_TIME_relative_divide (
last_min_relative_record_time,
PUBLISH_OPS_PER_EXPIRATION),
zone_publish_time_window_default);
target_iteration_velocity_per_record
= GNUNET_TIME_relative_divide (zone_publish_time_window,
last_num_public_records);
}
target_iteration_velocity_per_record
= GNUNET_TIME_relative_min (target_iteration_velocity_per_record,
MAXIMUM_ZONE_ITERATION_INTERVAL);
GNUNET_STATISTICS_set (statistics,
"Minimum relative record expiration (in μs)",
last_min_relative_record_time.rel_value_us,
GNUNET_NO);
GNUNET_STATISTICS_set (statistics,
"Zone publication time window (in μs)",
zone_publish_time_window.rel_value_us,
GNUNET_NO);
GNUNET_STATISTICS_set (statistics,
"Target zone iteration velocity (μs)",
target_iteration_velocity_per_record.rel_value_us,
GNUNET_NO);
}
/**
* Re-calculate our velocity and the desired velocity.
* We have succeeded in making #DELTA_INTERVAL puts, so
* now calculate the new desired delay between puts.
*
* @param cnt how many records were processed since the last call?
*/
static void
update_velocity (unsigned int cnt)
{
struct GNUNET_TIME_Relative delta;
unsigned long long pct = 0;
if (0 == cnt)
return;
/* How fast were we really? */
delta = GNUNET_TIME_absolute_get_duration (last_put_100);
delta.rel_value_us /= cnt;
last_put_100 = GNUNET_TIME_absolute_get ();
/* calculate expected frequency */
if ((num_public_records > last_num_public_records) &&
(GNUNET_NO == first_zone_iteration))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Last record count was lower than current record count. Reducing interval.\n");
last_num_public_records = num_public_records
* LATE_ITERATION_SPEEDUP_FACTOR;
calculate_put_interval ();
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Desired global zone iteration interval is %s/record!\n",
GNUNET_STRINGS_relative_time_to_string (
target_iteration_velocity_per_record,
GNUNET_YES));
/* Tell statistics actual vs. desired speed */
GNUNET_STATISTICS_set (statistics,
"Current zone iteration velocity (μs/record)",
delta.rel_value_us,
GNUNET_NO);
/* update "sub_delta" based on difference, taking
previous sub_delta into account! */
if (target_iteration_velocity_per_record.rel_value_us > delta.rel_value_us)
{
/* We were too fast, reduce sub_delta! */
struct GNUNET_TIME_Relative corr;
corr = GNUNET_TIME_relative_subtract (target_iteration_velocity_per_record,
delta);
if (sub_delta.rel_value_us > delta.rel_value_us)
{
/* Reduce sub_delta by corr */
sub_delta = GNUNET_TIME_relative_subtract (sub_delta,
corr);
}
else
{
/* We're doing fine with waiting the full time, this
should theoretically only happen if we run at
infinite speed. */
sub_delta = GNUNET_TIME_UNIT_ZERO;
}
}
else if (target_iteration_velocity_per_record.rel_value_us <
delta.rel_value_us)
{
/* We were too slow, increase sub_delta! */
struct GNUNET_TIME_Relative corr;
corr = GNUNET_TIME_relative_subtract (delta,
target_iteration_velocity_per_record);
sub_delta = GNUNET_TIME_relative_add (sub_delta,
corr);
if (sub_delta.rel_value_us >
target_iteration_velocity_per_record.rel_value_us)
{
/* CPU overload detected, we cannot go at desired speed,
as this would mean using a negative delay. */
/* compute how much faster we would want to be for
the desired velocity */
if (0 == target_iteration_velocity_per_record.rel_value_us)
pct = UINT64_MAX; /* desired speed is infinity ... */
else
pct = (sub_delta.rel_value_us
- target_iteration_velocity_per_record.rel_value_us) * 100LLU
/ target_iteration_velocity_per_record.rel_value_us;
sub_delta = target_iteration_velocity_per_record;
}
}
GNUNET_STATISTICS_set (statistics,
"# dispatched jobs",
job_queue_length,
GNUNET_NO);
GNUNET_STATISTICS_set (statistics,
"% speed increase needed for target velocity",
pct,
GNUNET_NO);
GNUNET_STATISTICS_set (statistics,
"# records processed in current iteration",
num_public_records,
GNUNET_NO);
}
/**
* Check if the current zone iteration needs to be continued
* by calling #publish_zone_namestore_next(), and if so with what delay.
*/
static void
check_zone_namestore_next ()
{
struct GNUNET_TIME_Relative delay;
if (0 != ns_iteration_left)
return; /* current NAMESTORE iteration not yet done */
if (job_queue_length >= JOB_QUEUE_LIMIT)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Job queue length exceeded (%u/%u). Pausing namestore iteration.\n",
job_queue_length,
JOB_QUEUE_LIMIT);
return;
}
update_velocity (put_cnt);
put_cnt = 0;
delay = GNUNET_TIME_relative_subtract (target_iteration_velocity_per_record,
sub_delta);
/* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the
per-record delay calculated so far with the #NS_BLOCK_SIZE */
GNUNET_STATISTICS_set (statistics,
"Current artificial NAMESTORE delay (μs/record)",
delay.rel_value_us,
GNUNET_NO);
delay = GNUNET_TIME_relative_multiply (delay,
NS_BLOCK_SIZE);
/* make sure we do not overshoot because of the #NS_BLOCK_SIZE factor */
delay = GNUNET_TIME_relative_min (MAXIMUM_ZONE_ITERATION_INTERVAL,
delay);
/* no delays on first iteration */
if (GNUNET_YES == first_zone_iteration)
delay = GNUNET_TIME_UNIT_ZERO;
GNUNET_assert (NULL == zone_publish_task);
zone_publish_task = GNUNET_SCHEDULER_add_delayed (delay,
&publish_zone_namestore_next,
NULL);
}
/**
* Continuation called from DHT once the PUT operation is done.
*
*/
static void
dht_put_continuation (void *cls)
{
struct RecordPublicationJob *job = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"PUT complete; Pending jobs: %u\n", job_queue_length - 1);
/* When we just fall under the limit, trigger monitor/iterator again
* if halted. We can only safely trigger one, prefer iterator. */
if (NULL == zone_publish_task)
check_zone_namestore_next ();
if (job_queue_length <= JOB_QUEUE_LIMIT)
{
if (GNUNET_YES == monitor_halted)
{
GNUNET_NAMESTORE_zone_monitor_next (zmon, 1);
monitor_halted = GNUNET_NO;
}
}
job_queue_length--;
GNUNET_CONTAINER_DLL_remove (dht_jobs_head,
dht_jobs_tail,
job);
free_job (job);
}
/**
* Store GNS records in the DHT.
*
* @param key key of the zone
* @param label label to store under
* @param rd_public public record data
* @param rd_public_count number of records in @a rd_public
* @param ma handle for the put operation
* @return DHT PUT handle, NULL on error
*/
static void
dispatch_job (const struct GNUNET_CRYPTO_PrivateKey *key,
const char *label,
const struct GNUNET_GNSRECORD_Data *rd,
unsigned int rd_count,
const struct GNUNET_TIME_Absolute expire)
{
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
struct GNUNET_GNSRECORD_Block *block;
struct GNUNET_GNSRECORD_Block *block_priv;
struct GNUNET_TIME_Absolute expire_pub;
size_t block_size;
unsigned int rd_public_count = 0;
char *emsg;
if (GNUNET_OK !=
GNUNET_GNSRECORD_normalize_record_set (label,
rd,
rd_count,
rd_public,
&rd_public_count,
&expire_pub,
GNUNET_GNSRECORD_FILTER_OMIT_PRIVATE,
&emsg))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"%s\n", emsg);
GNUNET_free (emsg);
}
GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
expire_pub,
label,
rd_public,
rd_public_count,
&block));
if (NULL == block)
{
GNUNET_break (0);
return; /* whoops */
}
if (rd_count != rd_public_count)
GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
expire,
label,
rd,
rd_count,
&
block_priv));
else
block_priv = block;
block_size = GNUNET_GNSRECORD_block_get_size (block);
GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
struct RecordPublicationJob *job = GNUNET_new (struct RecordPublicationJob);
job->block = block;
job->block_size = block_size;
job->block_priv = block_priv;
job->zone = *key;
job->label = GNUNET_strdup (label);
job->expire_pub = expire_pub;
GNUNET_CONTAINER_DLL_insert (sign_jobs_head, sign_jobs_tail, job);
GNUNET_assert (0 == pthread_cond_signal (&sign_jobs_cond));
GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating job with %u record(s) for label `%s', expiration `%s'\n",
rd_public_count,
label,
GNUNET_STRINGS_absolute_time_to_string (expire));
num_public_records++;
return;
}
static void
notification_pipe_cb (void *cls);
static void
initiate_put_from_pipe_trigger (void *cls)
{
struct GNUNET_HashCode query;
struct RecordPublicationJob *job;
const struct GNUNET_DISK_FileHandle *np_fh;
char buf[100];
ssize_t nf_count;
pipe_read_task = NULL;
np_fh = GNUNET_DISK_pipe_handle (notification_pipe,
GNUNET_DISK_PIPE_END_READ);
pipe_read_task =
GNUNET_SCHEDULER_add_read_file (
GNUNET_TIME_UNIT_FOREVER_REL,
np_fh,
notification_pipe_cb,
NULL);
/* empty queue */
nf_count = GNUNET_DISK_file_read (np_fh, buf, sizeof (buf));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Read %lld notifications from pipe\n",
(long long) nf_count);
while (true)
{
GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
if (NULL == sign_results_head)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"No more results. Back to sleep.\n");
GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
return;
}
job = sign_results_head;
GNUNET_CONTAINER_DLL_remove (sign_results_head, sign_results_tail, job);
GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
GNUNET_GNSRECORD_query_from_private_key (&job->zone,
job->label,
&query);
job->ph = GNUNET_DHT_put (dht_handle,
&query,
DHT_GNS_REPLICATION_LEVEL,
GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
job->block_size,
job->block,
job->expire_pub,
&dht_put_continuation,
job);
if (NULL == job->ph)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Could not perform DHT PUT, is the DHT running?\n");
free_job (job);
return;
}
GNUNET_STATISTICS_update (statistics,
"DHT put operations initiated",
1,
GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Storing record(s) for label `%s' in DHT under key %s\n",
job->label,
GNUNET_h2s (&query));
refresh_block (job->block_priv);
GNUNET_CONTAINER_DLL_insert (dht_jobs_head, dht_jobs_tail, job);
}
}
/**
* We encountered an error in our zone iteration.
*
* @param cls NULL
*/
static void
zone_iteration_error (void *cls)
{
(void) cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Got disconnected from namestore database, retrying.\n");
namestore_iter = NULL;
/* We end up here on error/disconnect/shutdown, so potentially
while a zone publish task or a DHT put is still running; hence
we need to cancel those. */
if (NULL != zone_publish_task)
{
GNUNET_SCHEDULER_cancel (zone_publish_task);
zone_publish_task = NULL;
}
zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
NULL);
}
/**
* Zone iteration is completed.
*
* @param cls NULL
*/
static void
zone_iteration_finished (void *cls)
{
(void) cls;
/* we're done with one iteration, calculate when to do the next one */
namestore_iter = NULL;
last_num_public_records = num_public_records;
first_zone_iteration = GNUNET_NO;
last_min_relative_record_time = min_relative_record_time;
calculate_put_interval ();
/* reset for next iteration */
min_relative_record_time
= GNUNET_TIME_UNIT_FOREVER_REL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Zone iteration finished. Adjusted zone iteration interval to %s\n",
GNUNET_STRINGS_relative_time_to_string (
target_iteration_velocity_per_record,
GNUNET_YES));
GNUNET_STATISTICS_set (statistics,
"Target zone iteration velocity (μs)",
target_iteration_velocity_per_record.rel_value_us,
GNUNET_NO);
GNUNET_STATISTICS_set (statistics,
"Number of public records in DHT",
last_num_public_records,
GNUNET_NO);
GNUNET_assert (NULL == zone_publish_task);
if (0 == last_num_public_records)
{
zone_publish_task = GNUNET_SCHEDULER_add_delayed (
target_iteration_velocity_per_record,
&publish_zone_dht_start,
NULL);
}
else
{
zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
NULL);
}
}
/**
* Function used to put all records successively into the DHT.
*
* @param cls the closure (NULL)
* @param key the private key of the authority (ours)
* @param label the name of the records, NULL once the iteration is done
* @param rd_count the number of records in @a rd
* @param rd the record data
*/
static void
handle_record (void *cls,
const struct GNUNET_CRYPTO_PrivateKey *key,
const char *label,
unsigned int rd_count,
const struct GNUNET_GNSRECORD_Data *rd,
struct GNUNET_TIME_Absolute expire)
{
(void) cls;
ns_iteration_left--;
if (0 == rd_count)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Record set empty, moving to next record set\n");
check_zone_namestore_next ();
return;
}
for (unsigned int i = 0; i < rd_count; i++)
{
if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
{
/* GNUNET_GNSRECORD_block_create will convert to absolute time;
we just need to adjust our iteration frequency */
min_relative_record_time.rel_value_us =
GNUNET_MIN (rd[i].expiration_time,
min_relative_record_time.rel_value_us);
}
}
/* We got a set of records to publish */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting DHT PUT\n");
put_cnt++;
if (0 == put_cnt % DELTA_INTERVAL)
update_velocity (DELTA_INTERVAL);
dispatch_job (key,
label,
rd,
rd_count,
expire);
job_queue_length++;
check_zone_namestore_next ();
}
/**
* Periodically iterate over all zones and store everything in DHT
*
* @param cls NULL
*/
static void
publish_zone_dht_start (void *cls)
{
(void) cls;
zone_publish_task = NULL;
GNUNET_STATISTICS_update (statistics,
"Full zone iterations launched",
1,
GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting DHT zone update!\n");
/* start counting again */
num_public_records = 0;
GNUNET_assert (NULL == namestore_iter);
ns_iteration_left = 1;
namestore_iter
= GNUNET_NAMESTORE_zone_iteration_start2 (namestore_handle,
NULL, /* All zones */
&zone_iteration_error,
NULL,
&handle_record,
NULL,
&zone_iteration_finished,
NULL,
GNUNET_GNSRECORD_FILTER_NONE);
GNUNET_assert (NULL != namestore_iter);
}
/**
* Store GNS records in the DHT.
*
* @param key key of the zone
* @param label label to store under
* @param rd_public public record data
* @param rd_public_count number of records in @a rd_public
*/
static void
dispatch_job_monitor (const struct GNUNET_CRYPTO_PrivateKey *key,
const char *label,
const struct GNUNET_GNSRECORD_Data *rd,
unsigned int rd_count,
struct GNUNET_TIME_Absolute expire)
{
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
struct GNUNET_GNSRECORD_Block *block;
struct GNUNET_GNSRECORD_Block *block_priv;
struct GNUNET_TIME_Absolute expire_pub;
size_t block_size;
unsigned int rd_public_count = 0;
char *emsg;
if (GNUNET_OK !=
GNUNET_GNSRECORD_normalize_record_set (label,
rd,
rd_count,
rd_public,
&rd_public_count,
&expire_pub,
GNUNET_GNSRECORD_FILTER_OMIT_PRIVATE,
&emsg))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"%s\n", emsg);
GNUNET_free (emsg);
}
GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
expire_pub,
label,
rd_public,
rd_public_count,
&block));
if (NULL == block)
{
GNUNET_break (0);
return; /* whoops */
}
if (rd_count != rd_public_count)
GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
expire,
label,
rd,
rd_count,
&
block_priv));
else
block_priv = block;
block_size = GNUNET_GNSRECORD_block_get_size (block);
GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
struct RecordPublicationJob *job = GNUNET_new (struct RecordPublicationJob);
job->block = block;
job->block_size = block_size;
job->block_priv = block_priv;
job->zone = *key;
job->label = GNUNET_strdup (label);
job->expire_pub = expire_pub;
GNUNET_CONTAINER_DLL_insert (sign_jobs_head, sign_jobs_tail, job);
GNUNET_assert (0 == pthread_cond_signal (&sign_jobs_cond));
GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
}
/**
* Process a record that was stored in the namestore
* (invoked by the monitor).
*
* @param cls closure, NULL
* @param zone private key of the zone; NULL on disconnect
* @param label label of the records; NULL on disconnect
* @param rd_count number of entries in @a rd array, 0 if label was deleted
* @param rd array of records with data to store
* @param expire expiration of this record set
*/
static void
handle_monitor_event (void *cls,
const struct GNUNET_CRYPTO_PrivateKey *zone,
const char *label,
unsigned int rd_count,
const struct GNUNET_GNSRECORD_Data *rd,
struct GNUNET_TIME_Absolute expire)
{
(void) cls;
GNUNET_STATISTICS_update (statistics,
"Namestore monitor events received",
1,
GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received %u records for label `%s' via namestore monitor\n",
rd_count,
label);
if (0 == rd_count)
{
GNUNET_NAMESTORE_zone_monitor_next (zmon,
1);
return; /* nothing to do */
}
dispatch_job_monitor (zone,
label,
rd,
rd_count,
expire);
job_queue_length++;
if (job_queue_length >= JOB_QUEUE_LIMIT)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Job queue length exceeded (%u/%u). Halting monitor.\n",
job_queue_length,
JOB_QUEUE_LIMIT);
monitor_halted = GNUNET_YES;
return;
}
GNUNET_NAMESTORE_zone_monitor_next (zmon,
1);
}
/**
* The zone monitor encountered an IPC error trying to to get in
* sync. Restart from the beginning.
*
* @param cls NULL
*/
static void
handle_monitor_error (void *cls)
{
(void) cls;
GNUNET_STATISTICS_update (statistics,
"Namestore monitor errors encountered",
1,
GNUNET_NO);
}
static void*
sign_worker (void *cls)
{
struct RecordPublicationJob *job;
const struct GNUNET_DISK_FileHandle *fh;
fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE);
while (GNUNET_YES != in_shutdown)
{
GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
while (NULL == sign_jobs_head)
GNUNET_assert (0 == pthread_cond_wait (&sign_jobs_cond, &sign_jobs_lock));
if (GNUNET_YES == in_shutdown)
{
GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
return NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Taking on Job for %s\n", sign_jobs_head->label);
job = sign_jobs_head;
GNUNET_CONTAINER_DLL_remove (sign_jobs_head, sign_jobs_tail, job);
GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block);
if (job->block != job->block_priv)
GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv);
GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
GNUNET_CONTAINER_DLL_insert (sign_results_head, sign_results_tail, job);
GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
job = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Done, notifying main thread through pipe!\n");
GNUNET_DISK_file_write (fh, "!", 1);
}
return NULL;
}
static void
notification_pipe_cb (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received wake up notification through pipe, checking results\n");
GNUNET_SCHEDULER_add_now (&initiate_put_from_pipe_trigger, NULL);
}
/**
* Perform zonemaster duties: watch namestore, publish records.
*
* @param cls closure
* @param server the initialized server
* @param c configuration to use
*/
static void
run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *c,
struct GNUNET_SERVICE_Handle *service)
{
unsigned long long max_parallel_bg_queries = 128;
(void) cls;
(void) service;
pthread_mutex_init (&sign_jobs_lock, NULL);
pthread_mutex_init (&sign_results_lock, NULL);
pthread_cond_init (&sign_jobs_cond, NULL);
last_put_100 = GNUNET_TIME_absolute_get (); /* first time! */
min_relative_record_time
= GNUNET_TIME_UNIT_FOREVER_REL;
target_iteration_velocity_per_record = INITIAL_ZONE_ITERATION_INTERVAL;
namestore_handle = GNUNET_NAMESTORE_connect (c);
if (NULL == namestore_handle)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_ ("Failed to connect to the namestore!\n"));
GNUNET_SCHEDULER_shutdown ();
return;
}
disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (c,
"namecache",
"DISABLE");
if (GNUNET_NO == disable_namecache)
{
namecache = GNUNET_NAMECACHE_connect (c);
if (NULL == namecache)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_ ("Failed to connect to the namecache!\n"));
GNUNET_SCHEDULER_shutdown ();
return;
}
}
cache_keys = GNUNET_CONFIGURATION_get_value_yesno (c,
"namestore",
"CACHE_KEYS");
zone_publish_time_window_default = GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY;
if (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_time (c,
"zonemaster",
"ZONE_PUBLISH_TIME_WINDOW",
&zone_publish_time_window_default))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Time window for zone iteration: %s\n",
GNUNET_STRINGS_relative_time_to_string (
zone_publish_time_window,
GNUNET_YES));
}
zone_publish_time_window = zone_publish_time_window_default;
if (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_number (c,
"zonemaster",
"MAX_PARALLEL_BACKGROUND_QUERIES",
&max_parallel_bg_queries))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Number of allowed parallel background queries: %llu\n",
max_parallel_bg_queries);
}
if (0 == max_parallel_bg_queries)
max_parallel_bg_queries = 1;
dht_handle = GNUNET_DHT_connect (c,
(unsigned int) max_parallel_bg_queries);
if (NULL == dht_handle)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_ ("Could not connect to DHT!\n"));
GNUNET_SCHEDULER_add_now (&shutdown_task,
NULL);
return;
}
/* Schedule periodic put for our records. */
first_zone_iteration = GNUNET_YES;
statistics = GNUNET_STATISTICS_create ("zonemaster",
c);
GNUNET_STATISTICS_set (statistics,
"Target zone iteration velocity (μs)",
target_iteration_velocity_per_record.rel_value_us,
GNUNET_NO);
zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
NULL);
zmon = GNUNET_NAMESTORE_zone_monitor_start2 (c,
NULL,
GNUNET_NO,
&handle_monitor_error,
NULL,
&handle_monitor_event,
NULL,
NULL /* sync_cb */,
NULL,
GNUNET_GNSRECORD_FILTER_NONE);
GNUNET_NAMESTORE_zone_monitor_next (zmon,
NAMESTORE_MONITOR_QUEUE_LIMIT - 1);
GNUNET_break (NULL != zmon);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
NULL);
notification_pipe = GNUNET_DISK_pipe (GNUNET_DISK_PF_NONE);
const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
notification_pipe,
GNUNET_DISK_PIPE_END_READ);
pipe_read_task = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
np_fh,
notification_pipe_cb, NULL);
long long unsigned int worker_count = 1;
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"zonemaster",
"WORKER_COUNT",
&worker_count))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Number of workers not defined falling back to 1\n");
}
worker = GNUNET_malloc (sizeof (pthread_t) * worker_count);
/** Start worker */
for (int i = 0; i < worker_count; i++)
{
if (0 !=
pthread_create (&worker[i],
NULL,
&sign_worker,
NULL))
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"pthread_create");
GNUNET_SCHEDULER_shutdown ();
}
}
}
/**
* Define "main" method using service macro.
*/
GNUNET_SERVICE_MAIN
("zonemaster",
GNUNET_SERVICE_OPTION_NONE,
&run,
NULL,
NULL,
NULL,
GNUNET_MQ_handler_end ());
/* end of gnunet-service-zonemaster.c */