/* This file is part of GNUnet (C) 2009, 2010 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ /** * @file datastore/plugin_datastore_mysql.c * @brief mysql-based datastore backend * @author Igor Wronsky * @author Christian Grothoff * * NOTE: This db module does NOT work with mysql prior to 4.1 since * it uses prepared statements. MySQL 5.0.46 promises to fix a bug * in MyISAM that is causing us grief. At the time of this writing, * that version is yet to be released. In anticipation, the code * will use MyISAM with 5.0.46 (and higher). If you run such a * version, please run "make check" to verify that the MySQL bug * was actually fixed in your version (and if not, change the * code below to use MyISAM for gn071). * * HIGHLIGHTS * * Pros * + On up-to-date hardware where mysql can be used comfortably, this * module will have better performance than the other db choices * (according to our tests). * + Its often possible to recover the mysql database from internal * inconsistencies. The other db choices do not support repair! * Cons * - Memory usage (Comment: "I have 1G and it never caused me trouble") * - Manual setup * * MANUAL SETUP INSTRUCTIONS * * 1) in /etc/gnunet.conf, set * @verbatim [datastore] DATABASE = "mysql" @endverbatim * 2) Then access mysql as root, * @verbatim $ mysql -u root -p @endverbatim * and do the following. [You should replace $USER with the username * that will be running the gnunetd process]. * @verbatim CREATE DATABASE gnunet; GRANT select,insert,update,delete,create,alter,drop,create temporary tables ON gnunet.* TO $USER@localhost; SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like'); FLUSH PRIVILEGES; @endverbatim * 3) In the $HOME directory of $USER, create a ".my.cnf" file * with the following lines * @verbatim [client] user=$USER password=$the_password_you_like @endverbatim * * Thats it. Note that .my.cnf file is a security risk unless its on * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic * link. Even greater security risk can be achieved by setting no * password for $USER. Luckily $USER has only priviledges to mess * up GNUnet's tables, nothing else (unless you give him more, * of course).

* * 4) Still, perhaps you should briefly try if the DB connection * works. First, login as $USER. Then use, * * @verbatim $ mysql -u $USER -p $the_password_you_like mysql> use gnunet; @endverbatim * * If you get the message "Database changed" it probably works. * * [If you get "ERROR 2002: Can't connect to local MySQL server * through socket '/tmp/mysql.sock' (2)" it may be resolvable by * "ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock" * so there may be some additional trouble depending on your mysql setup.] * * REPAIRING TABLES * * - Its probably healthy to check your tables for inconsistencies * every now and then. * - If you get odd SEGVs on gnunetd startup, it might be that the mysql * databases have been corrupted. * - The tables can be verified/fixed in two ways; * 1) by running mysqlcheck -A, or * 2) by executing (inside of mysql using the GNUnet database): * @verbatim mysql> REPAIR TABLE gn090; mysql> REPAIR TABLE gn072; @endverbatim * * PROBLEMS? * * If you have problems related to the mysql module, your best * friend is probably the mysql manual. The first thing to check * is that mysql is basically operational, that you can connect * to it, create tables, issue queries etc. * * TODO: * - use FOREIGN KEY for 'uid/vkey' * - consistent naming of uid/vkey */ #include "platform.h" #include "plugin_datastore.h" #include "gnunet_util_lib.h" #include #define DEBUG_MYSQL GNUNET_NO #define MAX_DATUM_SIZE 65536 /** * Maximum number of supported parameters for a prepared * statement. Increase if needed. */ #define MAX_PARAM 16 /** * Die with an error message that indicates * a failure of the command 'cmd' with the message given * by strerror(errno). */ #define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0); /** * Log an error message at log-level 'level' that indicates * a failure of the command 'cmd' on file 'filename' * with the message given by strerror(errno). */ #define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0); /* warning, slighly crazy mysql statements ahead. Essentially, MySQL does not handle "OR" very well, so we need to use UNION instead. And UNION does not automatically apply a LIMIT on the outermost clause, so we need to repeat ourselves quite a bit. All hail the performance gods (and thanks to #mysql on freenode) */ #define SELECT_IT_LOW_PRIORITY "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\ "ORDER BY prio ASC,vkey ASC LIMIT 1) " \ "UNION "\ "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\ "ORDER BY prio ASC,vkey ASC LIMIT 1)"\ "ORDER BY prio ASC,vkey ASC LIMIT 1" #define SELECT_IT_NON_ANONYMOUS "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\ " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\ "UNION "\ "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\ " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\ "ORDER BY prio DESC,vkey DESC LIMIT 1" #define SELECT_IT_EXPIRATION_TIME "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\ "ORDER BY expire ASC,vkey ASC LIMIT 1) "\ "UNION "\ "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\ "ORDER BY expire ASC,vkey ASC LIMIT 1)"\ "ORDER BY expire ASC,vkey ASC LIMIT 1" #define SELECT_IT_MIGRATION_ORDER "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\ " AND expire > ? AND type!=3"\ " ORDER BY expire DESC,vkey DESC LIMIT 1) "\ "UNION "\ "(SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\ " AND expire > ? AND type!=3"\ " ORDER BY expire DESC,vkey DESC LIMIT 1)"\ "ORDER BY expire DESC,vkey DESC LIMIT 1" struct GNUNET_MysqlStatementHandle { struct GNUNET_MysqlStatementHandle *next; struct GNUNET_MysqlStatementHandle *prev; char *query; MYSQL_STMT *statement; int valid; }; /** * Context for the universal iterator. */ struct NextRequestClosure; /** * Type of a function that will prepare * the next iteration. * * @param cls closure * @param nc the next context; NULL for the last * call which gives the callback a chance to * clean up the closure * @return GNUNET_OK on success, GNUNET_NO if there are * no more values, GNUNET_SYSERR on error */ typedef int (*PrepareFunction)(void *cls, struct NextRequestClosure *nc); struct NextRequestClosure { struct Plugin *plugin; struct GNUNET_TIME_Absolute now; /** * Function to call to prepare the next * iteration. */ PrepareFunction prep; /** * Closure for prep. */ void *prep_cls; MYSQL_BIND rbind[6]; unsigned int type; unsigned int iter_select; PluginIterator dviter; void *dviter_cls; unsigned int last_prio; unsigned long long last_expire; unsigned long long last_vkey; int end_it; }; /** * Context for all functions in this plugin. */ struct Plugin { /** * Our execution environment. */ struct GNUNET_DATASTORE_PluginEnvironment *env; MYSQL *dbf; struct GNUNET_MysqlStatementHandle *shead; struct GNUNET_MysqlStatementHandle *stail; /** * Filename of "my.cnf" (msyql configuration). */ char *cnffile; /** * Closure of the 'next_task' (must be freed if 'next_task' is cancelled). */ struct NextRequestClosure *next_task_nc; /** * Pending task with scheduler for running the next request. */ GNUNET_SCHEDULER_TaskIdentifier next_task; /** * Statements dealing with gn072 table */ #define SELECT_VALUE "SELECT value FROM gn072 WHERE vkey=?" struct GNUNET_MysqlStatementHandle *select_value; #define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?" struct GNUNET_MysqlStatementHandle *delete_value; #define INSERT_VALUE "INSERT INTO gn072 (value) VALUES (?)" struct GNUNET_MysqlStatementHandle *insert_value; /** * Statements dealing with gn090 table */ #define INSERT_ENTRY "INSERT INTO gn090 (type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?)" struct GNUNET_MysqlStatementHandle *insert_entry; #define DELETE_ENTRY_BY_VKEY "DELETE FROM gn090 WHERE vkey=?" struct GNUNET_MysqlStatementHandle *delete_entry_by_vkey; #define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?" struct GNUNET_MysqlStatementHandle *select_entry_by_hash; #define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?" struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash; #define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?" struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type; #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,vkey FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?" struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type; #define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (hash) WHERE hash=?" struct GNUNET_MysqlStatementHandle *count_entry_by_hash; #define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?" struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash; #define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (hash) WHERE hash=? AND type=?" struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type; #define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?" struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type; #define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?" struct GNUNET_MysqlStatementHandle *update_entry; #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn072" struct GNUNET_MysqlStatementHandle *get_size; struct GNUNET_MysqlStatementHandle *iter[4]; }; /** * Obtain the location of ".my.cnf". * * @param cfg our configuration * @return NULL on error */ static char * get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg) { char *cnffile; char *home_dir; struct stat st; #ifndef WINDOWS struct passwd *pw; #endif int configured; #ifndef WINDOWS pw = getpwuid (getuid ()); if (!pw) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "getpwuid"); return NULL; } if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (cfg, "datastore-mysql", "CONFIG")) { GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_get_value_filename (cfg, "datastore-mysql", "CONFIG", &cnffile)); configured = GNUNET_YES; } else { home_dir = GNUNET_strdup (pw->pw_dir); #else home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1); plibc_conv_to_win_path ("~/", home_dir); #endif GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir); GNUNET_free (home_dir); configured = GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Trying to use file `%s' for MySQL configuration.\n"), cnffile); if ((0 != STAT (cnffile, &st)) || (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode))) { if (configured == GNUNET_YES) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Could not access file `%s': %s\n"), cnffile, STRERROR (errno)); GNUNET_free (cnffile); return NULL; } return cnffile; } /** * Free a prepared statement. * * @param plugin plugin context * @param s prepared statement */ static void prepared_statement_destroy (struct Plugin *plugin, struct GNUNET_MysqlStatementHandle *s) { GNUNET_CONTAINER_DLL_remove (plugin->shead, plugin->stail, s); if (s->valid) mysql_stmt_close (s->statement); GNUNET_free (s->query); GNUNET_free (s); } /** * Close database connection and all prepared statements (we got a DB * disconnect error). */ static int iclose (struct Plugin *plugin) { struct GNUNET_MysqlStatementHandle *spos; spos = plugin->shead; while (NULL != plugin->shead) prepared_statement_destroy (plugin, plugin->shead); if (plugin->dbf != NULL) { mysql_close (plugin->dbf); plugin->dbf = NULL; } return GNUNET_OK; } /** * Open the connection with the database (and initialize * our default options). * * @return GNUNET_OK on success */ static int iopen (struct Plugin *ret) { char *mysql_dbname; char *mysql_server; char *mysql_user; char *mysql_password; unsigned long long mysql_port; my_bool reconnect; unsigned int timeout; ret->dbf = mysql_init (NULL); if (ret->dbf == NULL) return GNUNET_SYSERR; if (ret->cnffile != NULL) mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile); mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client"); reconnect = 0; mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect); mysql_options (ret->dbf, MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout); mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8"); timeout = 60; /* in seconds */ mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout); mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout); mysql_dbname = NULL; if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datastore-mysql", "DATABASE")) GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, "datastore-mysql", "DATABASE", &mysql_dbname)); else mysql_dbname = GNUNET_strdup ("gnunet"); mysql_user = NULL; if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datastore-mysql", "USER")) { GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, "datastore-mysql", "USER", &mysql_user)); } mysql_password = NULL; if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datastore-mysql", "PASSWORD")) { GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, "datastore-mysql", "PASSWORD", &mysql_password)); } mysql_server = NULL; if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datastore-mysql", "HOST")) { GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, "datastore-mysql", "HOST", &mysql_server)); } mysql_port = 0; if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datastore-mysql", "PORT")) { GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql", "PORT", &mysql_port)); } GNUNET_assert (mysql_dbname != NULL); mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password, mysql_dbname, (unsigned int) mysql_port, NULL, CLIENT_IGNORE_SIGPIPE); GNUNET_free_non_null (mysql_server); GNUNET_free_non_null (mysql_user); GNUNET_free_non_null (mysql_password); GNUNET_free (mysql_dbname); if (mysql_error (ret->dbf)[0]) { LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_real_connect", ret); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Run the given MySQL statement. * * @param plugin plugin context * @param statement SQL statement to run * @return GNUNET_OK on success, GNUNET_SYSERR on error */ static int run_statement (struct Plugin *plugin, const char *statement) { if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin))) return GNUNET_SYSERR; mysql_query (plugin->dbf, statement); if (mysql_error (plugin->dbf)[0]) { LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_query", plugin); iclose (plugin); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Create a prepared statement. * * @param plugin plugin context * @param statement SQL statement text to prepare * @return NULL on error */ static struct GNUNET_MysqlStatementHandle * prepared_statement_create (struct Plugin *plugin, const char *statement) { struct GNUNET_MysqlStatementHandle *ret; ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle)); ret->query = GNUNET_strdup (statement); GNUNET_CONTAINER_DLL_insert (plugin->shead, plugin->stail, ret); return ret; } /** * Prepare a statement for running. * * @param plugin plugin context * @param ret handle to prepared statement * @return GNUNET_OK on success */ static int prepare_statement (struct Plugin *plugin, struct GNUNET_MysqlStatementHandle *ret) { if (GNUNET_YES == ret->valid) return GNUNET_OK; if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin))) return GNUNET_SYSERR; ret->statement = mysql_stmt_init (plugin->dbf); if (ret->statement == NULL) { iclose (plugin); return GNUNET_SYSERR; } if (mysql_stmt_prepare (ret->statement, ret->query, strlen (ret->query))) { LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_stmt_prepare", plugin); mysql_stmt_close (ret->statement); ret->statement = NULL; iclose (plugin); return GNUNET_SYSERR; } ret->valid = GNUNET_YES; return GNUNET_OK; } /** * Bind the parameters for the given MySQL statement * and run it. * * @param plugin plugin context * @param s statement to bind and run * @param ap arguments for the binding * @return GNUNET_SYSERR on error, GNUNET_OK on success */ static int init_params (struct Plugin *plugin, struct GNUNET_MysqlStatementHandle *s, va_list ap) { MYSQL_BIND qbind[MAX_PARAM]; unsigned int pc; unsigned int off; enum enum_field_types ft; pc = mysql_stmt_param_count (s->statement); if (pc > MAX_PARAM) { /* increase internal constant! */ GNUNET_break (0); return GNUNET_SYSERR; } memset (qbind, 0, sizeof (qbind)); off = 0; ft = 0; while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types)))) { qbind[off].buffer_type = ft; switch (ft) { case MYSQL_TYPE_FLOAT: qbind[off].buffer = va_arg (ap, float *); break; case MYSQL_TYPE_LONGLONG: qbind[off].buffer = va_arg (ap, unsigned long long *); qbind[off].is_unsigned = va_arg (ap, int); break; case MYSQL_TYPE_LONG: qbind[off].buffer = va_arg (ap, unsigned int *); qbind[off].is_unsigned = va_arg (ap, int); break; case MYSQL_TYPE_VAR_STRING: case MYSQL_TYPE_STRING: case MYSQL_TYPE_BLOB: qbind[off].buffer = va_arg (ap, void *); qbind[off].buffer_length = va_arg (ap, unsigned long); qbind[off].length = va_arg (ap, unsigned long *); break; default: /* unsupported type */ GNUNET_break (0); return GNUNET_SYSERR; } pc--; off++; } if (! ( (pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1) ) ) { GNUNET_break (0); return GNUNET_SYSERR; } if (mysql_stmt_bind_param (s->statement, qbind)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), "mysql_stmt_bind_param", __FILE__, __LINE__, mysql_stmt_error (s->statement)); iclose (plugin); return GNUNET_SYSERR; } if (mysql_stmt_execute (s->statement)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), "mysql_stmt_execute", __FILE__, __LINE__, mysql_stmt_error (s->statement)); iclose (plugin); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Type of a callback that will be called for each * data set returned from MySQL. * * @param cls user-defined argument * @param num_values number of elements in values * @param values values returned by MySQL * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort */ typedef int (*GNUNET_MysqlDataProcessor) (void *cls, unsigned int num_values, MYSQL_BIND * values); /** * Run a prepared SELECT statement. * * @param plugin plugin context * @param s statement to run * @param result_size number of elements in results array * @param results pointer to already initialized MYSQL_BIND * array (of sufficient size) for passing results * @param processor function to call on each result * @param processor_cls extra argument to processor * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective * values (size + buffer-reference for pointers); terminated * with "-1" * @return GNUNET_SYSERR on error, otherwise * the number of successfully affected (or queried) rows */ static int prepared_statement_run_select (struct Plugin *plugin, struct GNUNET_MysqlStatementHandle *s, unsigned int result_size, MYSQL_BIND * results, GNUNET_MysqlDataProcessor processor, void *processor_cls, ...) { va_list ap; int ret; unsigned int rsize; int total; if (GNUNET_OK != prepare_statement (plugin, s)) { GNUNET_break (0); return GNUNET_SYSERR; } va_start (ap, processor_cls); if (GNUNET_OK != init_params (plugin, s, ap)) { GNUNET_break (0); va_end (ap); return GNUNET_SYSERR; } va_end (ap); rsize = mysql_stmt_field_count (s->statement); if (rsize > result_size) { GNUNET_break (0); return GNUNET_SYSERR; } if (mysql_stmt_bind_result (s->statement, results)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), "mysql_stmt_bind_result", __FILE__, __LINE__, mysql_stmt_error (s->statement)); iclose (plugin); return GNUNET_SYSERR; } total = 0; while (1) { ret = mysql_stmt_fetch (s->statement); if (ret == MYSQL_NO_DATA) break; if (ret != 0) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), "mysql_stmt_fetch", __FILE__, __LINE__, mysql_stmt_error (s->statement)); iclose (plugin); return GNUNET_SYSERR; } if (processor != NULL) if (GNUNET_OK != processor (processor_cls, rsize, results)) break; total++; } mysql_stmt_reset (s->statement); return total; } /** * Run a prepared statement that does NOT produce results. * * @param plugin plugin context * @param s statement to run * @param insert_id NULL or address where to store the row ID of whatever * was inserted (only for INSERT statements!) * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective * values (size + buffer-reference for pointers); terminated * with "-1" * @return GNUNET_SYSERR on error, otherwise * the number of successfully affected rows */ static int prepared_statement_run (struct Plugin *plugin, struct GNUNET_MysqlStatementHandle *s, unsigned long long *insert_id, ...) { va_list ap; int affected; if (GNUNET_OK != prepare_statement (plugin, s)) return GNUNET_SYSERR; va_start (ap, insert_id); if (GNUNET_OK != init_params (plugin, s, ap)) { va_end (ap); return GNUNET_SYSERR; } va_end (ap); affected = mysql_stmt_affected_rows (s->statement); if (NULL != insert_id) *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement); mysql_stmt_reset (s->statement); return affected; } /** * Delete an value from the gn072 table. * * @param plugin plugin context * @param vkey vkey identifying the value to delete * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error */ static int do_delete_value (struct Plugin *plugin, unsigned long long vkey) { int ret; #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn072 table\n", vkey); #endif ret = prepared_statement_run (plugin, plugin->delete_value, NULL, MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1); if (ret > 0) { ret = GNUNET_OK; } else { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Deleting value %llu from gn072 table failed\n", vkey); } return ret; } /** * Insert a value into the gn072 table. * * @param plugin plugin context * @param value the value to insert * @param size size of the value * @param vkey vkey identifying the value henceforth (set) * @return GNUNET_OK on success, GNUNET_SYSERR on error */ static int do_insert_value (struct Plugin *plugin, const void *value, unsigned int size, unsigned long long *vkey) { unsigned long length = size; int ret; ret = prepared_statement_run (plugin, plugin->insert_value, vkey, MYSQL_TYPE_BLOB, value, length, &length, -1); if (ret == GNUNET_OK) { #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Inserted value number %llu with length %u into gn072 table\n", *vkey, size); #endif } else { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to insert %u byte value into gn072 table\n", size); } return ret; } /** * Delete an entry from the gn090 table. * * @param plugin plugin context * @param vkey vkey identifying the entry to delete * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error */ static int do_delete_entry_by_vkey (struct Plugin *plugin, unsigned long long vkey) { int ret; #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting value %llu from gn090 table\n", vkey); #endif ret = prepared_statement_run (plugin, plugin->delete_entry_by_vkey, NULL, MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1); if (ret > 0) { ret = GNUNET_OK; } else { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Deleting value %llu from gn090 table failed\n", vkey); } return ret; } /** * Function that simply returns GNUNET_OK * * @param cls closure, not used * @param num_values not used * @param values not used * @return GNUNET_OK */ static int return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values) { return GNUNET_OK; } /** * Run the prepared statement to get the next data item ready. * * @param cls not used * @param nrc closure for the next request iterator * @return GNUNET_OK on success, GNUNET_NO if there is no additional item */ static int iterator_helper_prepare (void *cls, struct NextRequestClosure *nrc) { struct Plugin *plugin; int ret; if (nrc == NULL) return GNUNET_NO; plugin = nrc->plugin; ret = GNUNET_SYSERR; switch (nrc->iter_select) { case 0: case 1: ret = prepared_statement_run_select (plugin, plugin->iter[nrc->iter_select], 6, nrc->rbind, &return_ok, NULL, MYSQL_TYPE_LONG, &nrc->last_prio, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &nrc->last_prio, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, -1); break; case 2: ret = prepared_statement_run_select (plugin, plugin->iter[nrc->iter_select], 6, nrc->rbind, &return_ok, NULL, MYSQL_TYPE_LONGLONG, &nrc->last_expire, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->last_expire, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, -1); break; case 3: ret = prepared_statement_run_select (plugin, plugin->iter[nrc->iter_select], 6, nrc->rbind, &return_ok, NULL, MYSQL_TYPE_LONGLONG, &nrc->last_expire, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->now.value, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->last_expire, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONGLONG, &nrc->now.value, GNUNET_YES, -1); break; default: GNUNET_assert (0); } return ret; } /** * Continuation of "mysql_next_request". * * @param next_cls the next context * @param tc the task context (unused) */ static void mysql_next_request_cont (void *next_cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct NextRequestClosure *nrc = next_cls; struct Plugin *plugin; int ret; unsigned int type; unsigned int priority; unsigned int anonymity; unsigned long long exp; unsigned long long vkey; unsigned long hashSize; GNUNET_HashCode key; struct GNUNET_TIME_Absolute expiration; unsigned long length; MYSQL_BIND *rbind; /* size 7 */ MYSQL_BIND dbind[1]; char datum[GNUNET_SERVER_MAX_MESSAGE_SIZE]; plugin = nrc->plugin; plugin->next_task = GNUNET_SCHEDULER_NO_TASK; plugin->next_task_nc = NULL; AGAIN: GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); nrc->now = GNUNET_TIME_absolute_get (); hashSize = sizeof (GNUNET_HashCode); memset (nrc->rbind, 0, sizeof (nrc->rbind)); rbind = nrc->rbind; rbind[0].buffer_type = MYSQL_TYPE_LONG; rbind[0].buffer = &type; rbind[0].is_unsigned = 1; rbind[1].buffer_type = MYSQL_TYPE_LONG; rbind[1].buffer = &priority; rbind[1].is_unsigned = 1; rbind[2].buffer_type = MYSQL_TYPE_LONG; rbind[2].buffer = &anonymity; rbind[2].is_unsigned = 1; rbind[3].buffer_type = MYSQL_TYPE_LONGLONG; rbind[3].buffer = &exp; rbind[3].is_unsigned = 1; rbind[4].buffer_type = MYSQL_TYPE_BLOB; rbind[4].buffer = &key; rbind[4].buffer_length = hashSize; rbind[4].length = &hashSize; rbind[5].buffer_type = MYSQL_TYPE_LONGLONG; rbind[5].buffer = &vkey; rbind[5].is_unsigned = GNUNET_YES; if ( (GNUNET_YES == nrc->end_it) || (GNUNET_OK != nrc->prep (nrc->prep_cls, nrc))) goto END_SET; GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); nrc->last_vkey = vkey; nrc->last_prio = priority; nrc->last_expire = exp; if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) || (hashSize != sizeof (GNUNET_HashCode)) ) { GNUNET_break (0); goto END_SET; } #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found value %llu with prio %u, anon %u, expire %llu selecting from gn090 table\n", vkey, priority, anonymity, exp); #endif /* now do query on gn072 */ length = sizeof (datum); memset (dbind, 0, sizeof (dbind)); dbind[0].buffer_type = MYSQL_TYPE_BLOB; dbind[0].buffer_length = length; dbind[0].length = &length; dbind[0].buffer = datum; ret = prepared_statement_run_select (plugin, plugin->select_value, 1, dbind, &return_ok, NULL, MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1); GNUNET_break (ret <= 1); /* should only have one rbind! */ if (ret > 0) ret = GNUNET_OK; if (ret != GNUNET_OK) { GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to obtain value %llu from table `%s'\n"), vkey, "gn072"); goto AGAIN; } GNUNET_break (length <= sizeof(datum)); #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Calling iterator with value `%s' number %llu of size %u with type %u, priority %u, anonymity %u and expiration %llu\n", GNUNET_h2s (&key), vkey, length, type, priority, anonymity, exp); #endif GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); expiration.value = exp; ret = nrc->dviter (nrc->dviter_cls, nrc, &key, length, datum, type, priority, anonymity, expiration, vkey); if (ret == GNUNET_SYSERR) { nrc->end_it = GNUNET_YES; return; } if (ret == GNUNET_NO) { do_delete_value (plugin, vkey); do_delete_entry_by_vkey (plugin, vkey); if (length != 0) plugin->env->duc (plugin->env->cls, - length); } return; END_SET: /* call dviter with "end of set" */ GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); nrc->dviter (nrc->dviter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); nrc->prep (nrc->prep_cls, NULL); GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); GNUNET_free (nrc); } /** * Function invoked on behalf of a "PluginIterator" * asking the database plugin to call the iterator * with the next item. * * @param next_cls whatever argument was given * to the PluginIterator as "next_cls". * @param end_it set to GNUNET_YES if we * should terminate the iteration early * (iterator should be still called once more * to signal the end of the iteration). */ static void mysql_plugin_next_request (void *next_cls, int end_it) { struct NextRequestClosure *nrc = next_cls; if (GNUNET_YES == end_it) nrc->end_it = GNUNET_YES; nrc->plugin->next_task_nc = nrc; nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched, &mysql_next_request_cont, nrc); } /** * Iterate over the items in the datastore * using the given query to select and order * the items. * * @param plugin plugin context * @param type entries of which type should be considered? * @param iter_select which iterator statement are we using * @param is_asc are we using ascending order? * @param dviter function to call on each matching item * @param dviter_cls closure for dviter */ static void iterateHelper (struct Plugin *plugin, enum GNUNET_BLOCK_Type type, int is_asc, unsigned int iter_select, PluginIterator dviter, void *dviter_cls) { struct NextRequestClosure *nrc; nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); nrc->plugin = plugin; nrc->type = type; nrc->iter_select = iter_select; nrc->dviter = dviter; nrc->dviter_cls = dviter_cls; nrc->prep = &iterator_helper_prepare; if (is_asc) { nrc->last_prio = 0; nrc->last_vkey = 0; nrc->last_expire = 0; } else { nrc->last_prio = 0x7FFFFFFFL; nrc->last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */ nrc->last_expire = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */ } mysql_plugin_next_request (nrc, GNUNET_NO); } /** * Get an estimate of how much space the database is * currently using. * * @param cls our "struct Plugin *" * @return number of bytes used on disk */ static unsigned long long mysql_plugin_get_size (void *cls) { struct Plugin *plugin = cls; MYSQL_BIND cbind[1]; long long total; memset (cbind, 0, sizeof (cbind)); total = 0; cbind[0].buffer_type = MYSQL_TYPE_LONGLONG; cbind[0].buffer = &total; cbind[0].is_unsigned = GNUNET_NO; if (GNUNET_OK != prepared_statement_run_select (plugin, plugin->get_size, 1, cbind, &return_ok, NULL, -1)) return 0; return total; } /** * Store an item in the datastore. * * @param cls closure * @param key key for the item * @param size number of bytes in data * @param data content stored * @param type type of the content * @param priority priority of the content * @param anonymity anonymity-level for the content * @param expiration expiration time for the content * @param msg set to error message * @return GNUNET_OK on success */ static int mysql_plugin_put (void *cls, const GNUNET_HashCode * key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, char **msg) { struct Plugin *plugin = cls; unsigned int itype = type; unsigned int ipriority = priority; unsigned int ianonymity = anonymity; unsigned long long lexpiration = expiration.value; unsigned long hashSize; unsigned long hashSize2; unsigned long long vkey; GNUNET_HashCode vhash; if (size > MAX_DATUM_SIZE) { GNUNET_break (0); return GNUNET_SYSERR; } hashSize = sizeof (GNUNET_HashCode); hashSize2 = sizeof (GNUNET_HashCode); GNUNET_CRYPTO_hash (data, size, &vhash); if (GNUNET_OK != do_insert_value (plugin, data, size, &vkey)) return GNUNET_SYSERR; if (GNUNET_OK != prepared_statement_run (plugin, plugin->insert_entry, NULL, MYSQL_TYPE_LONG, &itype, GNUNET_YES, MYSQL_TYPE_LONG, &ipriority, GNUNET_YES, MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES, MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2, MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1)) { do_delete_value (plugin, vkey); return GNUNET_SYSERR; } #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Inserted value `%s' number %llu with size %u into gn090 table\n", GNUNET_h2s (key), vkey, (unsigned int) size); #endif if (size > 0) plugin->env->duc (plugin->env->cls, size); return GNUNET_OK; } /** * Select a subset of the items in the datastore and call * the given iterator for each of them. * * @param cls our "struct Plugin*" * @param type entries of which type should be considered? * Use 0 for any type. * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void mysql_plugin_iter_low_priority (void *cls, enum GNUNET_BLOCK_Type type, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; iterateHelper (plugin, type, GNUNET_YES, 0, iter, iter_cls); } struct GetContext { GNUNET_HashCode key; GNUNET_HashCode vhash; unsigned int prio; unsigned int anonymity; unsigned long long expiration; unsigned long long vkey; unsigned long long total; int off; int count; int have_vhash; }; static int get_statement_prepare (void *cls, struct NextRequestClosure *nrc) { struct GetContext *gc = cls; struct Plugin *plugin; int ret; unsigned int limit_off; unsigned long hashSize; if (NULL == nrc) { GNUNET_free (gc); return GNUNET_NO; } if (gc->count == gc->total) return GNUNET_NO; plugin = nrc->plugin; hashSize = sizeof (GNUNET_HashCode); if (gc->count + gc->off == gc->total) nrc->last_vkey = 0; /* back to start */ if (gc->count == 0) limit_off = gc->off; else limit_off = 0; #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Obtaining result number %d/%lld at offset %d with lvc %llu for GET `%s'\n", gc->count+1, gc->total, limit_off, nrc->last_vkey, GNUNET_h2s (&gc->key)); #endif if (nrc->type != 0) { if (gc->have_vhash) { ret = prepared_statement_run_select (plugin, plugin->select_entry_by_hash_vhash_and_type, 6, nrc->rbind, &return_ok, NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES, -1); } else { ret = prepared_statement_run_select (plugin, plugin->select_entry_by_hash_and_type, 6, nrc->rbind, &return_ok, NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES, -1); } } else { if (gc->have_vhash) { ret = prepared_statement_run_select (plugin, plugin->select_entry_by_hash_and_vhash, 6, nrc->rbind, &return_ok, NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES, -1); } else { ret = prepared_statement_run_select (plugin, plugin->select_entry_by_hash, 6, nrc->rbind, &return_ok, NULL, MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, MYSQL_TYPE_LONGLONG, &nrc->last_vkey, GNUNET_YES, MYSQL_TYPE_LONG, &limit_off, GNUNET_YES, -1); } } gc->count++; return ret; } /** * Iterate over the results for a particular key * in the datastore. * * @param cls closure * @param key maybe NULL (to match all entries) * @param vhash hash of the value, maybe NULL (to * match all values that have the right key). * Note that for DBlocks there is no difference * betwen key and vhash, but for other blocks * there may be! * @param type entries of which type are relevant? * Use 0 for any type. * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void mysql_plugin_get (void *cls, const GNUNET_HashCode * key, const GNUNET_HashCode * vhash, enum GNUNET_BLOCK_Type type, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; unsigned int itype = type; int ret; MYSQL_BIND cbind[1]; struct GetContext *gc; struct NextRequestClosure *nrc; long long total; unsigned long hashSize; if (iter == NULL) return; if (key == NULL) { mysql_plugin_iter_low_priority (plugin, type, iter, iter_cls); return; } hashSize = sizeof (GNUNET_HashCode); memset (cbind, 0, sizeof (cbind)); total = -1; cbind[0].buffer_type = MYSQL_TYPE_LONGLONG; cbind[0].buffer = &total; cbind[0].is_unsigned = GNUNET_NO; if (type != 0) { if (vhash != NULL) { ret = prepared_statement_run_select (plugin, plugin->count_entry_by_hash_vhash_and_type, 1, cbind, &return_ok, NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, MYSQL_TYPE_LONG, &itype, GNUNET_YES, -1); } else { ret = prepared_statement_run_select (plugin, plugin->count_entry_by_hash_and_type, 1, cbind, &return_ok, NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_LONG, &itype, GNUNET_YES, -1); } } else { if (vhash != NULL) { ret = prepared_statement_run_select (plugin, plugin->count_entry_by_hash_and_vhash, 1, cbind, &return_ok, NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, -1); } else { ret = prepared_statement_run_select (plugin, plugin->count_entry_by_hash, 1, cbind, &return_ok, NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, -1); } } if ((ret != GNUNET_OK) || (0 >= total)) { iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iterating over %lld results for GET `%s'\n", total, GNUNET_h2s (key)); #endif gc = GNUNET_malloc (sizeof (struct GetContext)); gc->key = *key; if (vhash != NULL) { gc->have_vhash = GNUNET_YES; gc->vhash = *vhash; } gc->total = total; gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total); nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); nrc->plugin = plugin; nrc->type = type; nrc->iter_select = -1; nrc->dviter = iter; nrc->dviter_cls = iter_cls; nrc->prep = &get_statement_prepare; nrc->prep_cls = gc; nrc->last_vkey = 0; mysql_plugin_next_request (nrc, GNUNET_NO); } /** * Update the priority for a particular key in the datastore. If * the expiration time in value is different than the time found in * the datastore, the higher value should be kept. For the * anonymity level, the lower value is to be used. The specified * priority should be added to the existing priority, ignoring the * priority in value. * * Note that it is possible for multiple values to match this put. * In that case, all of the respective values are updated. * * @param cls our "struct Plugin*" * @param uid unique identifier of the datum * @param delta by how much should the priority * change? If priority + delta < 0 the * priority should be set to 0 (never go * negative). * @param expire new expiration time should be the * MAX of any existing expiration time and * this value * @param msg set to error message * @return GNUNET_OK on success */ static int mysql_plugin_update (void *cls, uint64_t uid, int delta, struct GNUNET_TIME_Absolute expire, char **msg) { struct Plugin *plugin = cls; unsigned long long vkey = uid; unsigned long long lexpire = expire.value; int ret; #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Updating value %llu adding %d to priority and maxing exp at %llu\n", vkey, delta, lexpire); #endif ret = prepared_statement_run (plugin, plugin->update_entry, NULL, MYSQL_TYPE_LONG, &delta, GNUNET_NO, MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES, MYSQL_TYPE_LONGLONG, &lexpire, GNUNET_YES, MYSQL_TYPE_LONGLONG, &vkey, GNUNET_YES, -1); if (ret != GNUNET_OK) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n", vkey); } return ret; } /** * Select a subset of the items in the datastore and call * the given iterator for each of them. * * @param cls our "struct Plugin*" * @param type entries of which type should be considered? * Use 0 for any type. * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void mysql_plugin_iter_zero_anonymity (void *cls, enum GNUNET_BLOCK_Type type, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; iterateHelper (plugin, type, GNUNET_NO, 1, iter, iter_cls); } /** * Select a subset of the items in the datastore and call * the given iterator for each of them. * * @param cls our "struct Plugin*" * @param type entries of which type should be considered? * Use 0 for any type. * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void mysql_plugin_iter_ascending_expiration (void *cls, enum GNUNET_BLOCK_Type type, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; iterateHelper (plugin, type, GNUNET_YES, 2, iter, iter_cls); } /** * Select a subset of the items in the datastore and call * the given iterator for each of them. * * @param cls our "struct Plugin*" * @param type entries of which type should be considered? * Use 0 for any type. * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void mysql_plugin_iter_migration_order (void *cls, enum GNUNET_BLOCK_Type type, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; iterateHelper (plugin, 0, GNUNET_NO, 3, iter, iter_cls); } /** * Select a subset of the items in the datastore and call * the given iterator for each of them. * * @param cls our "struct Plugin*" * @param type entries of which type should be considered? * Use 0 for any type. * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void mysql_plugin_iter_all_now (void *cls, enum GNUNET_BLOCK_Type type, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; iterateHelper (plugin, 0, GNUNET_YES, 0, iter, iter_cls); } /** * Drop database. */ static void mysql_plugin_drop (void *cls) { struct Plugin *plugin = cls; if ((GNUNET_OK != run_statement (plugin, "DROP TABLE gn090")) || (GNUNET_OK != run_statement (plugin, "DROP TABLE gn072"))) return; /* error */ plugin->env->duc (plugin->env->cls, 0); } /** * Entry point for the plugin. * * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*" * @return our "struct Plugin*" */ void * libgnunet_plugin_datastore_mysql_init (void *cls) { struct GNUNET_DATASTORE_PluginEnvironment *env = cls; struct GNUNET_DATASTORE_PluginFunctions *api; struct Plugin *plugin; plugin = GNUNET_malloc (sizeof (struct Plugin)); plugin->env = env; plugin->cnffile = get_my_cnf_path (env->cfg); if (GNUNET_OK != iopen (plugin)) { iclose (plugin); GNUNET_free_non_null (plugin->cnffile); GNUNET_free (plugin); return NULL; } #define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) ) #define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b))) if (MRUNS ("CREATE TABLE IF NOT EXISTS gn090 (" " type INT(11) UNSIGNED NOT NULL DEFAULT 0," " prio INT(11) UNSIGNED NOT NULL DEFAULT 0," " anonLevel INT(11) UNSIGNED NOT NULL DEFAULT 0," " expire BIGINT UNSIGNED NOT NULL DEFAULT 0," " hash BINARY(64) NOT NULL DEFAULT ''," " vhash BINARY(64) NOT NULL DEFAULT ''," " vkey BIGINT UNSIGNED NOT NULL DEFAULT 0," " INDEX hash (hash(64))," " INDEX hash_vhash_vkey (hash(64),vhash(64),vkey)," " INDEX hash_vkey (hash(64),vkey)," " INDEX vkey (vkey)," " INDEX prio (prio,vkey)," " INDEX expire (expire,vkey,type)," " INDEX anonLevel (anonLevel,prio,vkey,type)" ") ENGINE=InnoDB") || MRUNS ("CREATE TABLE IF NOT EXISTS gn072 (" " vkey BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY," " value BLOB NOT NULL DEFAULT '') ENGINE=MyISAM") || MRUNS ("SET AUTOCOMMIT = 1") || PINIT (plugin->select_value, SELECT_VALUE) || PINIT (plugin->delete_value, DELETE_VALUE) || PINIT (plugin->insert_value, INSERT_VALUE) || PINIT (plugin->insert_entry, INSERT_ENTRY) || PINIT (plugin->delete_entry_by_vkey, DELETE_ENTRY_BY_VKEY) || PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) || PINIT (plugin->select_entry_by_hash_and_vhash, SELECT_ENTRY_BY_HASH_AND_VHASH) || PINIT (plugin->select_entry_by_hash_and_type, SELECT_ENTRY_BY_HASH_AND_TYPE) || PINIT (plugin->select_entry_by_hash_vhash_and_type, SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) || PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) || PINIT (plugin->get_size, SELECT_SIZE) || PINIT (plugin->count_entry_by_hash_and_vhash, COUNT_ENTRY_BY_HASH_AND_VHASH) || PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE) || PINIT (plugin->count_entry_by_hash_vhash_and_type, COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) || PINIT (plugin->update_entry, UPDATE_ENTRY) || PINIT (plugin->iter[0], SELECT_IT_LOW_PRIORITY) || PINIT (plugin->iter[1], SELECT_IT_NON_ANONYMOUS) || PINIT (plugin->iter[2], SELECT_IT_EXPIRATION_TIME) || PINIT (plugin->iter[3], SELECT_IT_MIGRATION_ORDER)) { iclose (plugin); GNUNET_free_non_null (plugin->cnffile); GNUNET_free (plugin); return NULL; } #undef PINIT #undef MRUNS api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); api->cls = plugin; api->get_size = &mysql_plugin_get_size; api->put = &mysql_plugin_put; api->next_request = &mysql_plugin_next_request; api->get = &mysql_plugin_get; api->update = &mysql_plugin_update; api->iter_low_priority = &mysql_plugin_iter_low_priority; api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity; api->iter_ascending_expiration = &mysql_plugin_iter_ascending_expiration; api->iter_migration_order = &mysql_plugin_iter_migration_order; api->iter_all_now = &mysql_plugin_iter_all_now; api->drop = &mysql_plugin_drop; GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql", _("Mysql database running\n")); return api; } /** * Exit point from the plugin. * @param cls our "struct Plugin*" * @return always NULL */ void * libgnunet_plugin_datastore_mysql_done (void *cls) { struct GNUNET_DATASTORE_PluginFunctions *api = cls; struct Plugin *plugin = api->cls; iclose (plugin); if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (plugin->env->sched, plugin->next_task); plugin->next_task = GNUNET_SCHEDULER_NO_TASK; plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL); GNUNET_free (plugin->next_task_nc); plugin->next_task_nc = NULL; } GNUNET_free_non_null (plugin->cnffile); GNUNET_free (plugin); GNUNET_free (api); mysql_library_end (); return NULL; } /* end of plugin_datastore_mysql.c */