/*
* This file is part of GNUnet
* Copyright (C) 2009, 2011, 2017 GNUnet e.V.
*
* GNUnet is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* GNUnet is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
SPDX-License-Identifier: AGPL3.0-or-later
*/
/**
* @file datastore/plugin_datastore_sqlite.c
* @brief sqlite-based datastore backend
* @author Christian Grothoff
*/
#include "platform.h"
#include "gnunet_datastore_plugin.h"
#include "gnunet_sq_lib.h"
#include
/**
* We allocate items on the stack at times. To prevent a stack
* overflow, we impose a limit on the maximum size for the data per
* item. 64k should be enough.
*/
#define MAX_ITEM_SIZE 65536
/**
* After how many ms "busy" should a DB operation fail for good?
* A low value makes sure that we are more responsive to requests
* (especially PUTs). A high value guarantees a higher success
* rate (SELECTs in iterate can take several seconds despite LIMIT=1).
*
* The default value of 250ms should ensure that users do not experience
* huge latencies while at the same time allowing operations to succeed
* with reasonable probability.
*/
#define BUSY_TIMEOUT_MS 250
/**
* Log an error message at log-level 'level' that indicates
* a failure of the command 'cmd' on file 'filename'
* with the message given by strerror(errno).
*/
#define LOG_SQLITE(db, level, cmd) \
do \
{ \
GNUNET_log_from (level, \
"sqlite", \
_ ("`%s' failed at %s:%d with error: %s\n"), \
cmd, \
__FILE__, \
__LINE__, \
sqlite3_errmsg (db->dbh)); \
} while (0)
/**
* Log an error message at log-level 'level' that indicates
* a failure of the command 'cmd' on file 'filename'
* with the message given by strerror(errno).
*/
#define LOG_SQLITE_MSG(db, msg, level, cmd) \
do \
{ \
GNUNET_log_from (level, \
"sqlite", \
_ ("`%s' failed at %s:%d with error: %s\n"), \
cmd, \
__FILE__, \
__LINE__, \
sqlite3_errmsg (db->dbh)); \
GNUNET_asprintf (msg, \
_ ("`%s' failed at %s:%u with error: %s"), \
cmd, \
__FILE__, \
__LINE__, \
sqlite3_errmsg (db->dbh)); \
} while (0)
/**
* Context for all functions in this plugin.
*/
struct Plugin
{
/**
* Our execution environment.
*/
struct GNUNET_DATASTORE_PluginEnvironment *env;
/**
* Database filename.
*/
char *fn;
/**
* Native SQLite database handle.
*/
sqlite3 *dbh;
/**
* Precompiled SQL for remove_key.
*/
sqlite3_stmt *remove;
/**
* Precompiled SQL for deletion.
*/
sqlite3_stmt *delRow;
/**
* Precompiled SQL for update.
*/
sqlite3_stmt *update;
/**
* Get maximum repl value in database.
*/
sqlite3_stmt *maxRepl;
/**
* Precompiled SQL for replication decrement.
*/
sqlite3_stmt *updRepl;
/**
* Precompiled SQL for replication selection.
*/
sqlite3_stmt *selRepl;
/**
* Precompiled SQL for expiration selection.
*/
sqlite3_stmt *selExpi;
/**
* Precompiled SQL for expiration selection.
*/
sqlite3_stmt *selZeroAnon;
/**
* Precompiled SQL for insertion.
*/
sqlite3_stmt *insertContent;
/**
* Precompiled SQL for selection
*/
sqlite3_stmt *get[8];
/**
* Should the database be dropped on shutdown?
*/
int drop_on_shutdown;
};
/**
* @brief Prepare a SQL statement
*
* @param dbh handle to the database
* @param zSql SQL statement, UTF-8 encoded
* @param ppStmt set to the prepared statement
* @return 0 on success
*/
static int
sq_prepare (sqlite3 *dbh, const char *zSql, sqlite3_stmt **ppStmt)
{
char *dummy;
int result;
result = sqlite3_prepare_v2 (dbh,
zSql,
strlen (zSql),
ppStmt,
(const char **) &dummy);
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
"Prepared `%s' / %p: %d\n",
zSql,
*ppStmt,
result);
return result;
}
/**
* Create our database indices.
*
* @param dbh handle to the database
*/
static void
create_indices (sqlite3 *dbh)
{
/* create indices */
if (
0 !=
(SQLITE_OK !=
sqlite3_exec (dbh,
"CREATE INDEX IF NOT EXISTS idx_hash ON gn091 (hash)",
NULL,
NULL,
NULL))
+ (SQLITE_OK !=
sqlite3_exec (
dbh,
"CREATE INDEX IF NOT EXISTS idx_anon_type ON gn091 (anonLevel ASC,type)",
NULL,
NULL,
NULL))
+ (SQLITE_OK !=
sqlite3_exec (dbh,
"CREATE INDEX IF NOT EXISTS idx_expire ON gn091 (expire ASC)",
NULL,
NULL,
NULL))
+ (SQLITE_OK !=
sqlite3_exec (
dbh,
"CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn091 (repl,rvalue)",
NULL,
NULL,
NULL)))
GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
"sqlite",
"Failed to create indices: %s\n",
sqlite3_errmsg (dbh));
}
#if 0
#define CHECK(a) GNUNET_break (a)
#define ENULL NULL
#else
#define ENULL &e
#define ENULL_DEFINED 1
#define CHECK(a) \
if (! (a)) \
{ \
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%s\n", e); \
sqlite3_free (e); \
}
#endif
/**
* Initialize the database connections and associated
* data structures (create tables and indices
* as needed as well).
*
* @param cfg our configuration
* @param plugin the plugin context (state for this module)
* @return #GNUNET_OK on success
*/
static int
database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
struct Plugin *plugin)
{
sqlite3_stmt *stmt;
char *afsdir;
#if ENULL_DEFINED
char *e;
#endif
if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg,
"datastore-sqlite",
"FILENAME",
&afsdir))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"datastore-sqlite",
"FILENAME");
return GNUNET_SYSERR;
}
if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
{
if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
{
GNUNET_break (0);
GNUNET_free (afsdir);
return GNUNET_SYSERR;
}
/* database is new or got deleted, reset payload to zero! */
if (NULL != plugin->env->duc)
plugin->env->duc (plugin->env->cls, 0);
}
/* afsdir should be UTF-8-encoded. If it isn't, it's a bug */
plugin->fn = afsdir;
/* Open database and precompile statements */
if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh))
{
GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
"sqlite",
_ ("Unable to initialize SQLite: %s.\n"),
sqlite3_errmsg (plugin->dbh));
return GNUNET_SYSERR;
}
CHECK (
SQLITE_OK ==
sqlite3_exec (plugin->dbh, "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
CHECK (
SQLITE_OK ==
sqlite3_exec (plugin->dbh, "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh,
"PRAGMA legacy_file_format=OFF",
NULL,
NULL,
ENULL));
CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh,
"PRAGMA auto_vacuum=INCREMENTAL",
NULL,
NULL,
ENULL));
CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh,
"PRAGMA locking_mode=EXCLUSIVE",
NULL,
NULL,
ENULL));
CHECK (
SQLITE_OK ==
sqlite3_exec (plugin->dbh, "PRAGMA page_size=4096", NULL, NULL, ENULL));
CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
/* We have to do it here, because otherwise precompiling SQL might fail */
CHECK (SQLITE_OK ==
sq_prepare (plugin->dbh,
"SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn091'",
&stmt));
/* FIXME: SQLite does not have unsigned integers! This is ok for the type column because
* we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
* we do math or inequality tests, so we can't handle the entire range of uint32_t.
* This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
*/if ((SQLITE_DONE == sqlite3_step (stmt)) &&
(SQLITE_OK != sqlite3_exec (plugin->dbh,
"CREATE TABLE gn091 ("
" repl INT4 NOT NULL DEFAULT 0,"
" type INT4 NOT NULL DEFAULT 0,"
" prio INT4 NOT NULL DEFAULT 0,"
" anonLevel INT4 NOT NULL DEFAULT 0,"
" expire INT8 NOT NULL DEFAULT 0,"
" rvalue INT8 NOT NULL,"
" hash TEXT NOT NULL DEFAULT '',"
" vhash TEXT NOT NULL DEFAULT '',"
" value BLOB NOT NULL DEFAULT '')",
NULL,
NULL,
NULL)))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
sqlite3_finalize (stmt);
return GNUNET_SYSERR;
}
sqlite3_finalize (stmt);
create_indices (plugin->dbh);
#define RESULT_COLUMNS \
"repl, type, prio, anonLevel, expire, hash, value, _ROWID_"
if (
(SQLITE_OK != sq_prepare (plugin->dbh,
"UPDATE gn091 "
"SET prio = prio + ?, "
"repl = repl + ?, "
"expire = MAX(expire, ?) "
"WHERE hash = ? AND vhash = ?",
&plugin->update)) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"UPDATE gn091 "
"SET repl = MAX (0, repl - 1) WHERE _ROWID_ = ?",
&plugin->updRepl)) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE repl=?2 AND "
" (rvalue>=?1 OR "
" NOT EXISTS (SELECT 1 FROM gn091 "
"WHERE repl=?2 AND rvalue>=?1 LIMIT 1) ) "
"ORDER BY rvalue ASC LIMIT 1",
&plugin->selRepl)) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT MAX(repl) FROM gn091",
&plugin->maxRepl)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE NOT EXISTS (SELECT 1 FROM gn091 WHERE expire < ?1 LIMIT 1) OR (expire < ?1) "
"ORDER BY expire ASC LIMIT 1",
&plugin->selExpi)) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ? AND "
"anonLevel = 0 AND "
"type = ? "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->selZeroAnon)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
"INSERT INTO gn091 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
&plugin->insertContent)) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ?1 "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->get[0])) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ?1 AND "
"type = ?4 "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->get[1])) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ?1 AND "
"hash = ?3 "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->get[2])) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ?1 AND "
"hash = ?3 AND "
"type = ?4 "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->get[3])) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ?1 AND "
"rvalue >= ?2 "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->get[4])) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ?1 AND "
"rvalue >= ?2 AND "
"type = ?4 "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->get[5])) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ?1 AND "
"rvalue >= ?2 AND "
"hash = ?3 "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->get[6])) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"SELECT " RESULT_COLUMNS " FROM gn091 "
"WHERE _ROWID_ >= ?1 AND "
"rvalue >= ?2 AND "
"hash = ?3 AND "
"type = ?4 "
"ORDER BY _ROWID_ ASC LIMIT 1",
&plugin->get[7])) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"DELETE FROM gn091 WHERE _ROWID_ = ?",
&plugin->delRow)) ||
(SQLITE_OK != sq_prepare (plugin->dbh,
"DELETE FROM gn091 "
"WHERE hash = ? AND "
"value = ? ",
&plugin->remove)) ||
false)
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "precompiling");
return GNUNET_SYSERR;
}
return GNUNET_OK;
}
/**
* Shutdown database connection and associate data
* structures.
*
* @param plugin the plugin context (state for this module)
*/
static void
database_shutdown (struct Plugin *plugin)
{
int result;
#if SQLITE_VERSION_NUMBER >= 3007000
sqlite3_stmt *stmt;
#endif
if (NULL != plugin->remove)
sqlite3_finalize (plugin->remove);
if (NULL != plugin->delRow)
sqlite3_finalize (plugin->delRow);
if (NULL != plugin->update)
sqlite3_finalize (plugin->update);
if (NULL != plugin->updRepl)
sqlite3_finalize (plugin->updRepl);
if (NULL != plugin->selRepl)
sqlite3_finalize (plugin->selRepl);
if (NULL != plugin->maxRepl)
sqlite3_finalize (plugin->maxRepl);
if (NULL != plugin->selExpi)
sqlite3_finalize (plugin->selExpi);
if (NULL != plugin->selZeroAnon)
sqlite3_finalize (plugin->selZeroAnon);
if (NULL != plugin->insertContent)
sqlite3_finalize (plugin->insertContent);
for (int i = 0; i < 8; ++i)
if (NULL != plugin->get[i])
sqlite3_finalize (plugin->get[i]);
result = sqlite3_close (plugin->dbh);
#if SQLITE_VERSION_NUMBER >= 3007000
if (result == SQLITE_BUSY)
{
GNUNET_log_from (
GNUNET_ERROR_TYPE_WARNING,
"sqlite",
_ (
"Tried to close sqlite without finalizing all prepared statements.\n"));
stmt = sqlite3_next_stmt (plugin->dbh, NULL);
while (NULL != stmt)
{
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
"Closing statement %p\n",
stmt);
result = sqlite3_finalize (stmt);
if (result != SQLITE_OK)
GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
"sqlite",
"Failed to close statement %p: %d\n",
stmt,
result);
stmt = sqlite3_next_stmt (plugin->dbh, NULL);
}
result = sqlite3_close (plugin->dbh);
}
#endif
if (SQLITE_OK != result)
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
GNUNET_free (plugin->fn);
}
/**
* Delete the database entry with the given
* row identifier.
*
* @param plugin the plugin context (state for this module)
* @param rid the ID of the row to delete
*/
static int
delete_by_rowid (struct Plugin *plugin, uint64_t rid)
{
struct GNUNET_SQ_QueryParam params[] = { GNUNET_SQ_query_param_uint64 (&rid),
GNUNET_SQ_query_param_end };
if (GNUNET_OK != GNUNET_SQ_bind (plugin->delRow, params))
return GNUNET_SYSERR;
if (SQLITE_DONE != sqlite3_step (plugin->delRow))
{
LOG_SQLITE (plugin,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
GNUNET_SQ_reset (plugin->dbh, plugin->delRow);
return GNUNET_SYSERR;
}
GNUNET_SQ_reset (plugin->dbh, plugin->delRow);
return GNUNET_OK;
}
/**
* Store an item in the datastore.
*
* @param cls closure
* @param key key for the item
* @param absent true if the key was not found in the bloom filter
* @param size number of bytes in @a data
* @param data content stored
* @param type type of the content
* @param priority priority of the content
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
* @param cont continuation called with success or failure status
* @param cont_cls continuation closure
*/
static void
sqlite_plugin_put (void *cls,
const struct GNUNET_HashCode *key,
bool absent,
uint32_t size,
const void *data,
enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
uint32_t replication,
struct GNUNET_TIME_Absolute expiration,
PluginPutCont cont,
void *cont_cls)
{
struct Plugin *plugin = cls;
struct GNUNET_HashCode vhash;
char *msg = NULL;
GNUNET_CRYPTO_hash (data, size, &vhash);
if (! absent)
{
struct GNUNET_SQ_QueryParam params[] =
{ GNUNET_SQ_query_param_uint32 (&priority),
GNUNET_SQ_query_param_uint32 (&replication),
GNUNET_SQ_query_param_absolute_time (&expiration),
GNUNET_SQ_query_param_auto_from_type (key),
GNUNET_SQ_query_param_auto_from_type (&vhash),
GNUNET_SQ_query_param_end };
if (GNUNET_OK != GNUNET_SQ_bind (plugin->update, params))
{
cont (cont_cls, key, size, GNUNET_SYSERR, _ ("sqlite bind failure"));
return;
}
if (SQLITE_DONE != sqlite3_step (plugin->update))
{
LOG_SQLITE_MSG (plugin,
&msg,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
cont (cont_cls, key, size, GNUNET_SYSERR, msg);
GNUNET_free (msg);
return;
}
int changes = sqlite3_changes (plugin->dbh);
GNUNET_SQ_reset (plugin->dbh, plugin->update);
if (0 != changes)
{
cont (cont_cls, key, size, GNUNET_NO, NULL);
return;
}
}
uint64_t rvalue;
uint32_t type32 = (uint32_t) type;
struct GNUNET_SQ_QueryParam params[] =
{ GNUNET_SQ_query_param_uint32 (&replication),
GNUNET_SQ_query_param_uint32 (&type32),
GNUNET_SQ_query_param_uint32 (&priority),
GNUNET_SQ_query_param_uint32 (&anonymity),
GNUNET_SQ_query_param_absolute_time (&expiration),
GNUNET_SQ_query_param_uint64 (&rvalue),
GNUNET_SQ_query_param_auto_from_type (key),
GNUNET_SQ_query_param_auto_from_type (&vhash),
GNUNET_SQ_query_param_fixed_size (data, size),
GNUNET_SQ_query_param_end };
int n;
int ret;
sqlite3_stmt *stmt;
if (size > MAX_ITEM_SIZE)
{
cont (cont_cls, key, size, GNUNET_SYSERR, _ ("Data too large"));
return;
}
GNUNET_log_from (
GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
"Storing in database block with type %u/key `%s'/priority %u/expiration in %s (%s).\n",
type,
GNUNET_h2s (key),
priority,
GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (
expiration),
GNUNET_YES),
GNUNET_STRINGS_absolute_time_to_string (expiration));
stmt = plugin->insertContent;
rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
if (GNUNET_OK != GNUNET_SQ_bind (stmt, params))
{
cont (cont_cls, key, size, GNUNET_SYSERR, NULL);
return;
}
n = sqlite3_step (stmt);
switch (n)
{
case SQLITE_DONE:
if (NULL != plugin->env->duc)
plugin->env->duc (plugin->env->cls,
size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
"Stored new entry (%u bytes)\n",
size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
ret = GNUNET_OK;
break;
case SQLITE_BUSY:
GNUNET_break (0);
LOG_SQLITE_MSG (plugin,
&msg,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
ret = GNUNET_SYSERR;
break;
default:
LOG_SQLITE_MSG (plugin,
&msg,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
GNUNET_SQ_reset (plugin->dbh, stmt);
database_shutdown (plugin);
database_setup (plugin->env->cfg, plugin);
cont (cont_cls, key, size, GNUNET_SYSERR, msg);
GNUNET_free (msg);
return;
}
GNUNET_SQ_reset (plugin->dbh, stmt);
cont (cont_cls, key, size, ret, msg);
GNUNET_free (msg);
}
/**
* Execute statement that gets a row and call the callback
* with the result. Resets the statement afterwards.
*
* @param plugin the plugin
* @param stmt the statement
* @param proc processor to call
* @param proc_cls closure for @a proc
*/
static void
execute_get (struct Plugin *plugin,
sqlite3_stmt *stmt,
PluginDatumProcessor proc,
void *proc_cls)
{
int n;
struct GNUNET_TIME_Absolute expiration;
uint32_t replication;
uint32_t type;
uint32_t priority;
uint32_t anonymity;
uint64_t rowid;
void *value;
size_t value_size;
struct GNUNET_HashCode key;
int ret;
struct GNUNET_SQ_ResultSpec rs[] =
{ GNUNET_SQ_result_spec_uint32 (&replication),
GNUNET_SQ_result_spec_uint32 (&type),
GNUNET_SQ_result_spec_uint32 (&priority),
GNUNET_SQ_result_spec_uint32 (&anonymity),
GNUNET_SQ_result_spec_absolute_time (&expiration),
GNUNET_SQ_result_spec_auto_from_type (&key),
GNUNET_SQ_result_spec_variable_size (&value, &value_size),
GNUNET_SQ_result_spec_uint64 (&rowid),
GNUNET_SQ_result_spec_end };
n = sqlite3_step (stmt);
switch (n)
{
case SQLITE_ROW:
if (GNUNET_OK != GNUNET_SQ_extract_result (stmt, rs))
{
GNUNET_break (0);
break;
}
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
"Found reply in database with expiration %s\n",
GNUNET_STRINGS_absolute_time_to_string (expiration));
ret = proc (proc_cls,
&key,
value_size,
value,
type,
priority,
anonymity,
replication,
expiration,
rowid);
GNUNET_SQ_cleanup_result (rs);
GNUNET_SQ_reset (plugin->dbh, stmt);
if ((GNUNET_NO == ret) && (GNUNET_OK == delete_by_rowid (plugin, rowid)) &&
(NULL != plugin->env->duc))
plugin->env->duc (plugin->env->cls,
-(value_size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
return;
case SQLITE_DONE:
/* database must be empty */
break;
case SQLITE_BUSY:
case SQLITE_ERROR:
case SQLITE_MISUSE:
default:
LOG_SQLITE (plugin,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_reset");
GNUNET_break (0);
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
database_shutdown (plugin);
database_setup (plugin->env->cfg, plugin);
return;
}
GNUNET_SQ_reset (plugin->dbh, stmt);
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
/**
* Select a subset of the items in the datastore and call
* the given processor for the item.
*
* @param cls our plugin context
* @param next_uid return the result with lowest uid >= next_uid
* @param type entries of which type should be considered?
* Must not be zero (ANY).
* @param proc function to call on the matching value;
* will be called with NULL if no value matches
* @param proc_cls closure for @a proc
*/
static void
sqlite_plugin_get_zero_anonymity (void *cls,
uint64_t next_uid,
enum GNUNET_BLOCK_Type type,
PluginDatumProcessor proc,
void *proc_cls)
{
struct Plugin *plugin = cls;
uint32_t type32 = type;
struct GNUNET_SQ_QueryParam params[] = { GNUNET_SQ_query_param_uint64 (
&next_uid),
GNUNET_SQ_query_param_uint32 (
&type32),
GNUNET_SQ_query_param_end };
GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
if (GNUNET_OK != GNUNET_SQ_bind (plugin->selZeroAnon, params))
{
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
execute_get (plugin, plugin->selZeroAnon, proc, proc_cls);
}
/**
* Get results for a particular key in the datastore.
*
* @param cls closure
* @param next_uid return the result with lowest uid >= next_uid
* @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
* @param type entries of which type are relevant?
* Use 0 for any type.
* @param proc function to call on the matching value;
* will be called with NULL if nothing matches
* @param proc_cls closure for @a proc
*/
static void
sqlite_plugin_get_key (void *cls,
uint64_t next_uid,
bool random,
const struct GNUNET_HashCode *key,
enum GNUNET_BLOCK_Type type,
PluginDatumProcessor proc,
void *proc_cls)
{
struct Plugin *plugin = cls;
uint64_t rvalue;
int use_rvalue = random;
uint32_t type32 = (uint32_t) type;
int use_type = GNUNET_BLOCK_TYPE_ANY != type;
int use_key = NULL != key;
sqlite3_stmt *stmt = plugin->get[use_rvalue * 4 + use_key * 2 + use_type];
struct GNUNET_SQ_QueryParam params[] =
{ GNUNET_SQ_query_param_uint64 (&next_uid),
GNUNET_SQ_query_param_uint64 (&rvalue),
GNUNET_SQ_query_param_auto_from_type (key),
GNUNET_SQ_query_param_uint32 (&type32),
GNUNET_SQ_query_param_end };
/* SQLite doesn't like it when you try to bind a parameter greater than the
* last numbered parameter, but unused parameters in the middle are OK.
*/
if (! use_type)
{
params[3] = (struct GNUNET_SQ_QueryParam) GNUNET_SQ_query_param_end;
if (! use_key)
{
params[2] = (struct GNUNET_SQ_QueryParam) GNUNET_SQ_query_param_end;
if (! use_rvalue)
params[1] = (struct GNUNET_SQ_QueryParam) GNUNET_SQ_query_param_end;
}
}
if (random)
{
rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
next_uid = 0;
}
else
rvalue = 0;
if (GNUNET_OK != GNUNET_SQ_bind (stmt, params))
{
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
execute_get (plugin, stmt, proc, proc_cls);
}
/**
* Context for #repl_proc() function.
*/
struct ReplCtx
{
/**
* Function to call for the result (or the NULL).
*/
PluginDatumProcessor proc;
/**
* Closure for @e proc.
*/
void *proc_cls;
/**
* UID to use.
*/
uint64_t uid;
/**
* Yes if UID was set.
*/
int have_uid;
};
/**
* Wrapper for the processor for #sqlite_plugin_get_replication().
* Decrements the replication counter and calls the original
* processor.
*
* @param cls closure
* @param key key for the content
* @param size number of bytes in @a data
* @param data content stored
* @param type type of the content
* @param priority priority of the content
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
* @param uid unique identifier for the datum;
* maybe 0 if no unique identifier is available
* @return #GNUNET_OK for normal return,
* #GNUNET_NO to delete the item
*/
static int
repl_proc (void *cls,
const struct GNUNET_HashCode *key,
uint32_t size,
const void *data,
enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
uint32_t replication,
struct GNUNET_TIME_Absolute expiration,
uint64_t uid)
{
struct ReplCtx *rc = cls;
int ret;
if (GNUNET_SYSERR == rc->have_uid)
rc->have_uid = GNUNET_NO;
ret = rc->proc (rc->proc_cls,
key,
size,
data,
type,
priority,
anonymity,
replication,
expiration,
uid);
if (NULL != key)
{
rc->uid = uid;
rc->have_uid = GNUNET_YES;
}
return ret;
}
/**
* Get a random item for replication. Returns a single random item
* from those with the highest replication counters. The item's
* replication counter is decremented by one IF it was positive before.
* Call @a proc with all values ZERO or NULL if the datastore is empty.
*
* @param cls closure
* @param proc function to call the value (once only).
* @param proc_cls closure for @a proc
*/
static void
sqlite_plugin_get_replication (void *cls,
PluginDatumProcessor proc,
void *proc_cls)
{
struct Plugin *plugin = cls;
struct ReplCtx rc;
uint64_t rvalue = 0;
uint32_t repl;
struct GNUNET_SQ_QueryParam params_sel_repl[] =
{ GNUNET_SQ_query_param_uint64 (&rvalue),
GNUNET_SQ_query_param_uint32 (&repl),
GNUNET_SQ_query_param_end };
struct GNUNET_SQ_QueryParam params_upd_repl[] =
{ GNUNET_SQ_query_param_uint64 (&rc.uid), GNUNET_SQ_query_param_end };
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"datastore-sqlite",
"Getting random block based on replication order.\n");
if (SQLITE_ROW != sqlite3_step (plugin->maxRepl))
{
GNUNET_SQ_reset (plugin->dbh, plugin->maxRepl);
/* DB empty */
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
repl = sqlite3_column_int (plugin->maxRepl, 0);
GNUNET_SQ_reset (plugin->dbh, plugin->maxRepl);
rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
if (GNUNET_OK != GNUNET_SQ_bind (plugin->selRepl, params_sel_repl))
{
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
rc.have_uid = GNUNET_SYSERR;
rc.proc = proc;
rc.proc_cls = proc_cls;
execute_get (plugin, plugin->selRepl, &repl_proc, &rc);
if (GNUNET_YES == rc.have_uid)
{
if (GNUNET_OK != GNUNET_SQ_bind (plugin->updRepl, params_upd_repl))
{
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
if (SQLITE_DONE != sqlite3_step (plugin->updRepl))
LOG_SQLITE (plugin,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
GNUNET_SQ_reset (plugin->dbh, plugin->updRepl);
}
if (GNUNET_SYSERR == rc.have_uid)
{
/* proc was not called at all so far, do it now. */
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
}
/**
* Get a random item that has expired or has low priority.
* Call @a proc with all values ZERO or NULL if the datastore is empty.
*
* @param cls closure
* @param proc function to call the value (once only).
* @param proc_cls closure for @a proc
*/
static void
sqlite_plugin_get_expiration (void *cls,
PluginDatumProcessor proc,
void *proc_cls)
{
struct Plugin *plugin = cls;
sqlite3_stmt *stmt;
struct GNUNET_TIME_Absolute now = { 0 };
struct GNUNET_SQ_QueryParam params[] = { GNUNET_SQ_query_param_absolute_time (
&now),
GNUNET_SQ_query_param_end };
GNUNET_log_from (
GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
"Getting random block based on expiration and priority order.\n");
now = GNUNET_TIME_absolute_get ();
stmt = plugin->selExpi;
if (GNUNET_OK != GNUNET_SQ_bind (stmt, params))
{
proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
execute_get (plugin, stmt, proc, proc_cls);
}
/**
* Get all of the keys in the datastore.
*
* @param cls closure
* @param proc function to call on each key
* @param proc_cls closure for @a proc
*/
static void
sqlite_plugin_get_keys (void *cls, PluginKeyProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
struct GNUNET_HashCode key;
struct GNUNET_SQ_ResultSpec results[] =
{ GNUNET_SQ_result_spec_auto_from_type (&key), GNUNET_SQ_result_spec_end };
sqlite3_stmt *stmt;
int ret;
GNUNET_assert (NULL != proc);
if (SQLITE_OK != sq_prepare (plugin->dbh, "SELECT hash FROM gn091", &stmt))
{
LOG_SQLITE (plugin,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite_prepare");
proc (proc_cls, NULL, 0);
return;
}
while (SQLITE_ROW == (ret = sqlite3_step (stmt)))
{
if (GNUNET_OK == GNUNET_SQ_extract_result (stmt, results))
proc (proc_cls, &key, 1);
else
GNUNET_break (0);
}
if (SQLITE_DONE != ret)
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite_step");
sqlite3_finalize (stmt);
proc (proc_cls, NULL, 0);
}
/**
* Drop database.
*
* @param cls our plugin context
*/
static void
sqlite_plugin_drop (void *cls)
{
struct Plugin *plugin = cls;
plugin->drop_on_shutdown = GNUNET_YES;
}
/**
* Remove a particular key in the datastore.
*
* @param cls closure
* @param key key for the content
* @param size number of bytes in data
* @param data content stored
* @param cont continuation called with success or failure status
* @param cont_cls continuation closure for @a cont
*/
static void
sqlite_plugin_remove_key (void *cls,
const struct GNUNET_HashCode *key,
uint32_t size,
const void *data,
PluginRemoveCont cont,
void *cont_cls)
{
struct Plugin *plugin = cls;
struct GNUNET_SQ_QueryParam params[] =
{ GNUNET_SQ_query_param_auto_from_type (key),
GNUNET_SQ_query_param_fixed_size (data, size),
GNUNET_SQ_query_param_end };
if (GNUNET_OK != GNUNET_SQ_bind (plugin->remove, params))
{
cont (cont_cls, key, size, GNUNET_SYSERR, "bind failed");
return;
}
if (SQLITE_DONE != sqlite3_step (plugin->remove))
{
LOG_SQLITE (plugin,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
GNUNET_SQ_reset (plugin->dbh, plugin->remove);
cont (cont_cls, key, size, GNUNET_SYSERR, "sqlite3_step failed");
return;
}
int changes = sqlite3_changes (plugin->dbh);
GNUNET_SQ_reset (plugin->dbh, plugin->remove);
if (0 == changes)
{
cont (cont_cls, key, size, GNUNET_NO, NULL);
return;
}
if (NULL != plugin->env->duc)
plugin->env->duc (plugin->env->cls,
-(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
cont (cont_cls, key, size, GNUNET_OK, NULL);
}
/**
* Get an estimate of how much space the database is
* currently using.
*
* @param cls the `struct Plugin`
* @return the size of the database on disk (estimate)
*/
static void
sqlite_plugin_estimate_size (void *cls, unsigned long long *estimate)
{
struct Plugin *plugin = cls;
sqlite3_stmt *stmt;
uint64_t pages;
uint64_t page_size;
#if ENULL_DEFINED
char *e;
#endif
if (NULL == estimate)
return;
if (SQLITE_VERSION_NUMBER < 3006000)
{
GNUNET_log_from (
GNUNET_ERROR_TYPE_WARNING,
"datastore-sqlite",
_ ("sqlite version to old to determine size, assuming zero\n"));
*estimate = 0;
return;
}
CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh, "VACUUM", NULL, NULL, ENULL));
CHECK (SQLITE_OK == sqlite3_exec (plugin->dbh,
"PRAGMA auto_vacuum=INCREMENTAL",
NULL,
NULL,
ENULL));
if (SQLITE_OK != sq_prepare (plugin->dbh, "PRAGMA page_count", &stmt))
{
GNUNET_log_from (
GNUNET_ERROR_TYPE_WARNING,
"datastore-sqlite",
_("error preparing statement\n"));
return;
}
if (SQLITE_ROW == sqlite3_step (stmt))
pages = sqlite3_column_int64 (stmt, 0);
else
pages = 0;
sqlite3_finalize (stmt);
if (SQLITE_OK != sq_prepare (plugin->dbh, "PRAGMA page_size", &stmt))
{
GNUNET_log_from (
GNUNET_ERROR_TYPE_WARNING,
"datastore-sqlite",
_("error preparing statement\n"));
return;
}
if (SQLITE_ROW != sqlite3_step (stmt))
{
GNUNET_log_from (
GNUNET_ERROR_TYPE_WARNING,
"datastore-sqlite",
_("error stepping\n"));
return;
}
page_size = sqlite3_column_int64 (stmt, 0);
sqlite3_finalize (stmt);
GNUNET_log (
GNUNET_ERROR_TYPE_INFO,
_ (
"Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
(unsigned long long) pages,
(unsigned long long) page_size);
*estimate = pages * page_size;
}
/**
* Entry point for the plugin.
*
* @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
* @return NULL on error, othrewise the plugin context
*/
void *
libgnunet_plugin_datastore_sqlite_init (void *cls)
{
static struct Plugin plugin;
struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
struct GNUNET_DATASTORE_PluginFunctions *api;
if (NULL != plugin.env)
return NULL; /* can only initialize once! */
memset (&plugin, 0, sizeof(struct Plugin));
plugin.env = env;
if (GNUNET_OK != database_setup (env->cfg, &plugin))
{
database_shutdown (&plugin);
return NULL;
}
api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
api->cls = &plugin;
api->estimate_size = &sqlite_plugin_estimate_size;
api->put = &sqlite_plugin_put;
api->get_key = &sqlite_plugin_get_key;
api->get_replication = &sqlite_plugin_get_replication;
api->get_expiration = &sqlite_plugin_get_expiration;
api->get_zero_anonymity = &sqlite_plugin_get_zero_anonymity;
api->get_keys = &sqlite_plugin_get_keys;
api->drop = &sqlite_plugin_drop;
api->remove_key = &sqlite_plugin_remove_key;
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
"sqlite",
_ ("Sqlite database running\n"));
return api;
}
/**
* Exit point from the plugin.
*
* @param cls the plugin context (as returned by "init")
* @return always NULL
*/
void *
libgnunet_plugin_datastore_sqlite_done (void *cls)
{
char *fn;
struct GNUNET_DATASTORE_PluginFunctions *api = cls;
struct Plugin *plugin = api->cls;
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
"sqlite plugin is done\n");
fn = NULL;
if (plugin->drop_on_shutdown)
fn = GNUNET_strdup (plugin->fn);
database_shutdown (plugin);
plugin->env = NULL;
GNUNET_free (api);
if (NULL != fn)
{
if (0 != unlink (fn))
GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, "unlink", fn);
GNUNET_free (fn);
}
return NULL;
}
/* end of plugin_datastore_sqlite.c */