From 37a05b2c7724de81f963d45a6bf929608ee00bd5 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 15 Apr 2011 13:08:23 +0000 Subject: fixes --- src/datastore/plugin_datastore_mysql.c | 434 +++++++++++++-------- src/datastore/plugin_datastore_postgres.c | 611 ++++++++++++++---------------- 2 files changed, 555 insertions(+), 490 deletions(-) (limited to 'src/datastore') diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c index 1658aa51a..deef46af0 100644 --- a/src/datastore/plugin_datastore_mysql.c +++ b/src/datastore/plugin_datastore_mysql.c @@ -208,6 +208,8 @@ struct NextRequestClosure unsigned int count; int end_it; + + int one_shot; }; @@ -284,9 +286,12 @@ struct Plugin #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?" struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type; -#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=? LIMIT 1" +#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?" struct GNUNET_MysqlStatementHandle *update_entry; +#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?" + struct GNUNET_MysqlStatementHandle *dec_repl; + #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" struct GNUNET_MysqlStatementHandle *get_size; @@ -865,144 +870,6 @@ return_ok (void *cls, } -/** - * Continuation of "mysql_next_request". - * - * @param next_cls the next context - * @param tc the task context (unused) - */ -static void -mysql_next_request_cont (void *next_cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NextRequestClosure *nrc = next_cls; - struct Plugin *plugin; - int ret; - unsigned int type; - unsigned int priority; - unsigned int anonymity; - unsigned long long exp; - unsigned long hashSize; - unsigned long size; - unsigned long long uid; - char value[GNUNET_DATASTORE_MAX_VALUE_SIZE]; - GNUNET_HashCode key; - struct GNUNET_TIME_Absolute expiration; - MYSQL_BIND *rbind = nrc->rbind; - - plugin = nrc->plugin; - plugin->next_task = GNUNET_SCHEDULER_NO_TASK; - plugin->next_task_nc = NULL; - - if (GNUNET_YES == nrc->end_it) - goto END_SET; - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->now = GNUNET_TIME_absolute_get (); - hashSize = sizeof (GNUNET_HashCode); - memset (nrc->rbind, 0, sizeof (nrc->rbind)); - rbind = nrc->rbind; - rbind[0].buffer_type = MYSQL_TYPE_LONG; - rbind[0].buffer = &type; - rbind[0].is_unsigned = 1; - rbind[1].buffer_type = MYSQL_TYPE_LONG; - rbind[1].buffer = &priority; - rbind[1].is_unsigned = 1; - rbind[2].buffer_type = MYSQL_TYPE_LONG; - rbind[2].buffer = &anonymity; - rbind[2].is_unsigned = 1; - rbind[3].buffer_type = MYSQL_TYPE_LONGLONG; - rbind[3].buffer = &exp; - rbind[3].is_unsigned = 1; - rbind[4].buffer_type = MYSQL_TYPE_BLOB; - rbind[4].buffer = &key; - rbind[4].buffer_length = hashSize; - rbind[4].length = &hashSize; - rbind[5].buffer_type = MYSQL_TYPE_BLOB; - rbind[5].buffer = value; - rbind[5].buffer_length = size = sizeof (value); - rbind[5].length = &size; - rbind[6].buffer_type = MYSQL_TYPE_LONGLONG; - rbind[6].buffer = &uid; - rbind[6].is_unsigned = 1; - - if (GNUNET_OK != nrc->prep (nrc->prep_cls, - nrc)) - goto END_SET; - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - GNUNET_assert (size <= sizeof(value)); - if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) || - (hashSize != sizeof (GNUNET_HashCode)) ) - { - GNUNET_break (0); - goto END_SET; - } -#if DEBUG_MYSQL - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n", - (unsigned int) size, - GNUNET_h2s (&key), - priority, - anonymity, - exp); -#endif - expiration.abs_value = exp; - ret = nrc->dviter (nrc->dviter_cls, (nrc->end_it == GNUNET_YES) ? NULL : nrc, - &key, - size, value, - type, priority, anonymity, expiration, - uid); - if (ret == GNUNET_SYSERR) - { - nrc->end_it = GNUNET_YES; - return; - } - if (ret == GNUNET_NO) - { - do_delete_entry (plugin, uid); - if (size != 0) - plugin->env->duc (plugin->env->cls, - - size); - } - return; - END_SET: - /* call dviter with "end of set" */ - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->dviter (nrc->dviter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->prep (nrc->prep_cls, NULL); - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - GNUNET_free (nrc); -} - - -/** - * Function invoked on behalf of a "PluginIterator" - * asking the database plugin to call the iterator - * with the next item. - * - * @param next_cls whatever argument was given - * to the PluginIterator as "next_cls". - * @param end_it set to GNUNET_YES if we - * should terminate the iteration early - * (iterator should be still called once more - * to signal the end of the iteration). - */ -static void -mysql_plugin_next_request (void *next_cls, - int end_it) -{ - struct NextRequestClosure *nrc = next_cls; - - if (GNUNET_YES == end_it) - nrc->end_it = GNUNET_YES; - nrc->plugin->next_task_nc = nrc; - nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont, - nrc); -} - - /** * Get an estimate of how much space the database is * currently using. @@ -1167,6 +1034,152 @@ mysql_plugin_update (void *cls, } + + +/** + * Continuation of "mysql_next_request". + * + * @param next_cls the next context + * @param tc the task context (unused) + */ +static void +mysql_next_request_cont (void *next_cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NextRequestClosure *nrc = next_cls; + struct Plugin *plugin; + int ret; + unsigned int type; + unsigned int priority; + unsigned int anonymity; + unsigned long long exp; + unsigned long hashSize; + unsigned long size; + unsigned long long uid; + char value[GNUNET_DATASTORE_MAX_VALUE_SIZE]; + GNUNET_HashCode key; + struct GNUNET_TIME_Absolute expiration; + MYSQL_BIND *rbind = nrc->rbind; + + plugin = nrc->plugin; + plugin->next_task = GNUNET_SCHEDULER_NO_TASK; + plugin->next_task_nc = NULL; + + if (GNUNET_YES == nrc->end_it) + goto END_SET; + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + nrc->now = GNUNET_TIME_absolute_get (); + hashSize = sizeof (GNUNET_HashCode); + memset (nrc->rbind, 0, sizeof (nrc->rbind)); + rbind = nrc->rbind; + rbind[0].buffer_type = MYSQL_TYPE_LONG; + rbind[0].buffer = &type; + rbind[0].is_unsigned = 1; + rbind[1].buffer_type = MYSQL_TYPE_LONG; + rbind[1].buffer = &priority; + rbind[1].is_unsigned = 1; + rbind[2].buffer_type = MYSQL_TYPE_LONG; + rbind[2].buffer = &anonymity; + rbind[2].is_unsigned = 1; + rbind[3].buffer_type = MYSQL_TYPE_LONGLONG; + rbind[3].buffer = &exp; + rbind[3].is_unsigned = 1; + rbind[4].buffer_type = MYSQL_TYPE_BLOB; + rbind[4].buffer = &key; + rbind[4].buffer_length = hashSize; + rbind[4].length = &hashSize; + rbind[5].buffer_type = MYSQL_TYPE_BLOB; + rbind[5].buffer = value; + rbind[5].buffer_length = size = sizeof (value); + rbind[5].length = &size; + rbind[6].buffer_type = MYSQL_TYPE_LONGLONG; + rbind[6].buffer = &uid; + rbind[6].is_unsigned = 1; + + if (GNUNET_OK != nrc->prep (nrc->prep_cls, + nrc)) + goto END_SET; + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (size <= sizeof(value)); + if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) || + (hashSize != sizeof (GNUNET_HashCode)) ) + { + GNUNET_break (0); + goto END_SET; + } +#if DEBUG_MYSQL + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n", + (unsigned int) size, + GNUNET_h2s (&key), + priority, + anonymity, + exp); +#endif + expiration.abs_value = exp; + ret = nrc->dviter (nrc->dviter_cls, + (nrc->one_shot == GNUNET_YES) ? NULL : nrc, + &key, + size, value, + type, priority, anonymity, expiration, + uid); + if (ret == GNUNET_SYSERR) + { + nrc->end_it = GNUNET_YES; + return; + } + if (ret == GNUNET_NO) + { + do_delete_entry (plugin, uid); + if (size != 0) + plugin->env->duc (plugin->env->cls, + - size); + } + if (nrc->one_shot == GNUNET_YES) + GNUNET_free (nrc); + return; + END_SET: + /* call dviter with "end of set" */ + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + nrc->dviter (nrc->dviter_cls, + NULL, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + nrc->prep (nrc->prep_cls, NULL); + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_free (nrc); +} + + +/** + * Function invoked on behalf of a "PluginIterator" + * asking the database plugin to call the iterator + * with the next item. + * + * @param next_cls whatever argument was given + * to the PluginIterator as "next_cls". + * @param end_it set to GNUNET_YES if we + * should terminate the iteration early + * (iterator should be still called once more + * to signal the end of the iteration). + */ +static void +mysql_plugin_next_request (void *next_cls, + int end_it) +{ + struct NextRequestClosure *nrc = next_cls; + + if (GNUNET_YES == end_it) + nrc->end_it = GNUNET_YES; + nrc->plugin->next_task_nc = nrc; + nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont, + nrc); +} + + +/** + * Context for 'get_statement_prepare'. + */ struct GetContext { GNUNET_HashCode key; @@ -1466,7 +1479,6 @@ replication_prepare (void *cls, { struct Plugin *plugin = cls; - nrc->end_it = GNUNET_YES; return prepared_statement_run_select (plugin, plugin->select_replication, 7, nrc->rbind, @@ -1475,6 +1487,92 @@ replication_prepare (void *cls, } + +/** + * Context for 'repl_iter' function. + */ +struct ReplCtx +{ + + /** + * Plugin handle. + */ + struct Plugin *plugin; + + /** + * Function to call for the result (or the NULL). + */ + PluginIterator iter; + + /** + * Closure for iter. + */ + void *iter_cls; +}; + + +/** + * Wrapper for the iterator for 'sqlite_plugin_replication_get'. + * Decrements the replication counter and calls the original + * iterator. + * + * @param cls closure + * @param next_cls closure to pass to the "next" function. + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + * + * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue + * (continue on call to "next", of course), + * GNUNET_NO to delete the item and continue (if supported) + */ +static int +repl_iter (void *cls, + void *next_cls, + const GNUNET_HashCode *key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) +{ + struct ReplCtx *rc = cls; + struct Plugin *plugin = rc->plugin; + unsigned long long oid; + int ret; + + ret = rc->iter (rc->iter_cls, + next_cls, key, + size, data, + type, priority, anonymity, expiration, + uid); + if (NULL != key) + { + oid = (unsigned long long) uid; + ret = prepared_statement_run (plugin, + plugin->dec_repl, + NULL, + MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, + -1); + if (ret == GNUNET_SYSERR) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to reduce replication counter\n"); + return GNUNET_SYSERR; + } + } + return ret; +} + + /** * Get a random item for replication. Returns a single, not expired, random item * from those with the highest replication counters. The item's @@ -1490,18 +1588,23 @@ mysql_plugin_replication_get (void *cls, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure nrc; - - memset (&nrc, 0, sizeof (nrc)); - nrc.plugin = plugin; - nrc.now = GNUNET_TIME_absolute_get (); - nrc.prep = &replication_prepare; - nrc.prep_cls = plugin; - nrc.type = 0; - nrc.dviter = iter; - nrc.dviter_cls = iter_cls; - nrc.end_it = GNUNET_NO; - mysql_next_request_cont (&nrc, NULL); + struct NextRequestClosure *nrc; + struct ReplCtx rc; + + rc.plugin = plugin; + rc.iter = iter; + rc.iter_cls = iter_cls; + nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); + nrc->plugin = plugin; + nrc->now = GNUNET_TIME_absolute_get (); + nrc->prep = &replication_prepare; + nrc->prep_cls = plugin; + nrc->type = 0; + nrc->dviter = &repl_iter; + nrc->dviter_cls = &rc; + nrc->end_it = GNUNET_NO; + nrc->one_shot = GNUNET_YES; + mysql_next_request_cont (nrc, NULL); } @@ -1522,7 +1625,6 @@ expiration_prepare (void *cls, if (NULL == nrc) return GNUNET_NO; - nrc->end_it = GNUNET_YES; nt = (long long) nrc->now.abs_value; return prepared_statement_run_select (plugin, @@ -1547,18 +1649,19 @@ mysql_plugin_expiration_get (void *cls, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure nrc; - - memset (&nrc, 0, sizeof (nrc)); - nrc.plugin = plugin; - nrc.now = GNUNET_TIME_absolute_get (); - nrc.prep = &expiration_prepare; - nrc.prep_cls = plugin; - nrc.type = 0; - nrc.dviter = iter; - nrc.dviter_cls = iter_cls; - nrc.end_it = GNUNET_NO; - mysql_next_request_cont (&nrc, NULL); + struct NextRequestClosure *nrc; + + nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); + nrc->plugin = plugin; + nrc->now = GNUNET_TIME_absolute_get (); + nrc->prep = &expiration_prepare; + nrc->prep_cls = plugin; + nrc->type = 0; + nrc->dviter = iter; + nrc->dviter_cls = iter_cls; + nrc->end_it = GNUNET_NO; + nrc->one_shot = GNUNET_YES; + mysql_next_request_cont (nrc, NULL); } @@ -1639,6 +1742,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls) || PINIT (plugin->count_entry_by_hash_vhash_and_type, COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) || PINIT (plugin->update_entry, UPDATE_ENTRY) + || PINIT (plugin->dec_repl, DEC_REPL) || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) ) diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c index 1ff56da31..2cecfa9a1 100644 --- a/src/datastore/plugin_datastore_postgres.c +++ b/src/datastore/plugin_datastore_postgres.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2009, 2010 Christian Grothoff (and other contributing authors) + (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -30,45 +30,6 @@ #define DEBUG_POSTGRES GNUNET_NO -#define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\ - "WHERE (prio = $1 AND oid > $2) " \ - "ORDER BY prio ASC,oid ASC LIMIT 1) "\ - "UNION "\ - "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\ - "WHERE (prio > $1 AND oid != $2)"\ - "ORDER BY prio ASC,oid ASC LIMIT 1)"\ - "ORDER BY prio ASC,oid ASC LIMIT 1" - -#define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\ - "WHERE (prio = $1 AND oid < $2)"\ - " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\ - "UNION "\ - "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\ - "WHERE (prio < $1 AND oid != $2)"\ - " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\ - "ORDER BY prio DESC,oid DESC LIMIT 1" - -#define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\ - "WHERE (expire = $1 AND oid > $2) "\ - "ORDER BY expire ASC,oid ASC LIMIT 1) "\ - "UNION "\ - "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\ - "WHERE (expire > $1 AND oid != $2) " \ - "ORDER BY expire ASC,oid ASC LIMIT 1)"\ - "ORDER BY expire ASC,oid ASC LIMIT 1" - - -#define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\ - "WHERE (expire = $1 AND oid < $2)"\ - " AND expire > $3 AND type!=3"\ - " ORDER BY expire DESC,oid DESC LIMIT 1) "\ - "UNION "\ - "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\ - "WHERE (expire < $1 AND oid != $2)" \ - " AND expire > $3 AND type!=3"\ - " ORDER BY expire DESC,oid DESC LIMIT 1)"\ - "ORDER BY expire DESC,oid DESC LIMIT 1" - /** * After how many ms "busy" should a DB operation fail for good? * A low value makes sure that we are more responsive to requests @@ -140,7 +101,7 @@ struct NextRequestClosure /** * Number of entries found so far */ - long long count; + unsigned long long count; /** * Offset this iteration starts at. @@ -153,24 +114,14 @@ struct NextRequestClosure uint64_t blimit_off; /** - * Overall number of matching entries. - */ - unsigned long long total; - - /** - * Expiration value of previous result (possible parameter), big-endian. + * Current total number of entries found so far, big-endian. */ - uint64_t blast_expire; + uint64_t bcount; /** - * Row ID of last result (possible paramter), big-endian. - */ - uint32_t blast_rowid; - - /** - * Priority of last result (possible parameter), big-endian. + * Overall number of matching entries. */ - uint32_t blast_prio; + unsigned long long total; /** * Type of block (possible paramter), big-endian. @@ -181,6 +132,11 @@ struct NextRequestClosure * Flag set to GNUNET_YES to stop iteration. */ int end_it; + + /** + * Flag to indicate that there should only be one result. + */ + int one_shot; }; @@ -336,6 +292,7 @@ init_connection (struct Plugin *plugin) GNUNET_free_non_null (conninfo); ret = PQexec (plugin->dbh, "CREATE TABLE gn090 (" + " repl INTEGER NOT NULL DEFAULT 0," " type INTEGER NOT NULL DEFAULT 0," " prio INTEGER NOT NULL DEFAULT 0," " anonLevel INTEGER NOT NULL DEFAULT 0," @@ -385,7 +342,6 @@ init_connection (struct Plugin *plugin) } } PQclear (ret); -#if 1 ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"); if (GNUNET_OK != @@ -421,44 +377,43 @@ init_connection (struct Plugin *plugin) return GNUNET_SYSERR; } PQclear (ret); -#endif if ((GNUNET_OK != pq_prepare (plugin, "getvt", "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " "WHERE hash=$1 AND vhash=$2 AND type=$3 " - "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5", - 5, + "ORDER BY oid ASC LIMIT 1 OFFSET $4", + 4, __LINE__)) || (GNUNET_OK != pq_prepare (plugin, "gett", "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " - "WHERE hash=$1 AND type=$2" - "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4", - 4, + "WHERE hash=$1 AND type=$2 " + "ORDER BY oid ASC LIMIT 1 OFFSET $3", + 3, __LINE__)) || (GNUNET_OK != pq_prepare (plugin, "getv", "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " - "WHERE hash=$1 AND vhash=$2" - "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4", - 4, + "WHERE hash=$1 AND vhash=$2 " + "ORDER BY oid ASC LIMIT 1 OFFSET $3", + 3, __LINE__)) || (GNUNET_OK != pq_prepare (plugin, "get", "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " - "WHERE hash=$1" - "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3", - 3, + "WHERE hash=$1 " + "ORDER BY oid ASC LIMIT 1 OFFSET $2", + 2, __LINE__)) || (GNUNET_OK != pq_prepare (plugin, "put", - "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) " - "VALUES ($1, $2, $3, $4, $5, $6, $7)", + "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, hash, vhash, value) " + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", 8, __LINE__)) || (GNUNET_OK != @@ -470,32 +425,42 @@ init_connection (struct Plugin *plugin) __LINE__)) || (GNUNET_OK != pq_prepare (plugin, - "select_low_priority", - SELECT_IT_LOW_PRIORITY, - 2, + "decrepl", + "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) " + "WHERE oid = $1", + 1, __LINE__)) || (GNUNET_OK != pq_prepare (plugin, "select_non_anonymous", - SELECT_IT_NON_ANONYMOUS, - 2, + "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " + "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1", + 1, __LINE__)) || (GNUNET_OK != pq_prepare (plugin, - "select_expiration_time", - SELECT_IT_EXPIRATION_TIME, - 2, + "select_expiration_order", + "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " + "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " + "UNION " + "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " + "ORDER BY prio ASC LIMIT 1) " + "ORDER BY expire ASC LIMIT 1", + 1, __LINE__)) || (GNUNET_OK != pq_prepare (plugin, - "select_migration_order", - SELECT_IT_MIGRATION_ORDER, - 3, + "select_replication_order", + "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \ + "ORDER BY repl DESC,RANDOM() LIMIT 1", + 0, __LINE__)) || (GNUNET_OK != pq_prepare (plugin, "delrow", - "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__))) + "DELETE FROM gn090 " "WHERE oid=$1", + 1, + __LINE__))) { PQfinish (plugin->dbh); plugin->dbh = NULL; @@ -610,8 +575,10 @@ postgres_plugin_put (void *cls, uint32_t btype = htonl (type); uint32_t bprio = htonl (priority); uint32_t banon = htonl (anonymity); + uint32_t brepl = htonl (replication); uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__; const char *paramValues[] = { + (const char *) &brepl, (const char *) &btype, (const char *) &bprio, (const char *) &banon, @@ -621,6 +588,7 @@ postgres_plugin_put (void *cls, (const char *) data }; int paramLengths[] = { + sizeof (brepl), sizeof (btype), sizeof (bprio), sizeof (banon), @@ -629,11 +597,11 @@ postgres_plugin_put (void *cls, sizeof (GNUNET_HashCode), size }; - const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 }; + const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 }; GNUNET_CRYPTO_hash (data, size, &vhash); ret = PQexecPrepared (plugin->dbh, - "put", 7, paramValues, paramLengths, paramFormats, 1); + "put", 8, paramValues, paramLengths, paramFormats, 1); if (GNUNET_OK != check_result (plugin, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put", __LINE__)) @@ -649,6 +617,7 @@ postgres_plugin_put (void *cls, return GNUNET_OK; } + /** * Function invoked on behalf of a "PluginIterator" * asking the database plugin to call the iterator @@ -690,15 +659,11 @@ postgres_next_request_cont (void *next_cls, GNUNET_TIME_UNIT_ZERO_ABS, 0); GNUNET_free (nrc); return; - } - - if (nrc->count == 0) - nrc->blimit_off = GNUNET_htonll (nrc->off); - else - nrc->blimit_off = GNUNET_htonll (0); - if (nrc->count + nrc->off == nrc->total) - nrc->blast_rowid = htonl (0); /* back to start */ - + } + if (nrc->off == nrc->total) + nrc->off = 0; + nrc->blimit_off = GNUNET_htonll (nrc->off); + nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count); res = PQexecPrepared (plugin->dbh, nrc->pname, nrc->nparams, @@ -773,14 +738,10 @@ postgres_next_request_cont (void *next_cls, priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1)); anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2)); expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3)); - memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode)); + memcpy (&key, + PQgetvalue (res, 0, 4), + sizeof (GNUNET_HashCode)); size = PQgetlength (res, 0, 5); - - nrc->blast_prio = htonl (priority); - nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value); - nrc->blast_rowid = htonl (rowid); - nrc->count++; - #if DEBUG_POSTGRES GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres", @@ -789,7 +750,7 @@ postgres_next_request_cont (void *next_cls, (unsigned int) type); #endif iret = nrc->iter (nrc->iter_cls, - nrc, + (nrc->one_shot == GNUNET_YES) ? NULL : nrc, &key, size, PQgetvalue (res, 0, 5), @@ -799,6 +760,11 @@ postgres_next_request_cont (void *next_cls, expiration_time, rowid); PQclear (res); + if (iret != GNUNET_NO) + { + nrc->count++; + nrc->off++; + } if (iret == GNUNET_SYSERR) { #if DEBUG_POSTGRES @@ -828,6 +794,8 @@ postgres_next_request_cont (void *next_cls, #endif } } + if (nrc->one_shot == GNUNET_YES) + GNUNET_free (nrc); } @@ -857,183 +825,6 @@ postgres_plugin_next_request (void *next_cls, } -/** - * Update the priority for a particular key in the datastore. If - * the expiration time in value is different than the time found in - * the datastore, the higher value should be kept. For the - * anonymity level, the lower value is to be used. The specified - * priority should be added to the existing priority, ignoring the - * priority in value. - * - * Note that it is possible for multiple values to match this put. - * In that case, all of the respective values are updated. - * - * @param cls our "struct Plugin*" - * @param uid unique identifier of the datum - * @param delta by how much should the priority - * change? If priority + delta < 0 the - * priority should be set to 0 (never go - * negative). - * @param expire new expiration time should be the - * MAX of any existing expiration time and - * this value - * @param msg set to error message - * @return GNUNET_OK on success - */ -static int -postgres_plugin_update (void *cls, - uint64_t uid, - int delta, struct GNUNET_TIME_Absolute expire, - char **msg) -{ - struct Plugin *plugin = cls; - PGresult *ret; - int32_t bdelta = (int32_t) htonl ((uint32_t) delta); - uint32_t boid = htonl ( (uint32_t) uid); - uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__; - const char *paramValues[] = { - (const char *) &bdelta, - (const char *) &bexpire, - (const char *) &boid, - }; - int paramLengths[] = { - sizeof (bdelta), - sizeof (bexpire), - sizeof (boid), - }; - const int paramFormats[] = { 1, 1, 1 }; - - ret = PQexecPrepared (plugin->dbh, - "update", - 3, paramValues, paramLengths, paramFormats, 1); - if (GNUNET_OK != check_result (plugin, - ret, - PGRES_COMMAND_OK, - "PQexecPrepared", "update", __LINE__)) - return GNUNET_SYSERR; - PQclear (ret); - return GNUNET_OK; -} - - -/** - * Call a method for each key in the database and - * call the callback method on it. - * - * @param plugin global context - * @param type entries of which type should be considered? - * @param is_asc ascending or descending iteration? - * @param iter_select which SELECT method should be used? - * @param iter maybe NULL (to just count); iter - * should return GNUNET_SYSERR to abort the - * iteration, GNUNET_NO to delete the entry and - * continue and GNUNET_OK to continue iterating - * @param iter_cls closure for 'iter' - */ -static void -postgres_iterate (struct Plugin *plugin, - unsigned int type, - int is_asc, - unsigned int iter_select, - PluginIterator iter, void *iter_cls) -{ - struct NextRequestClosure *nrc; - - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->count = UINT32_MAX; - nrc->plugin = plugin; - nrc->iter = iter; - nrc->iter_cls = iter_cls; - if (is_asc) - { - nrc->blast_prio = htonl (0); - nrc->blast_rowid = htonl (0); - nrc->blast_expire = htonl (0); - } - else - { - nrc->blast_prio = htonl (0x7FFFFFFFL); - nrc->blast_rowid = htonl (0xFFFFFFFF); - nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL); - } - switch (iter_select) - { - case 0: - nrc->pname = "select_low_priority"; - nrc->nparams = 2; - nrc->paramValues[0] = (const char *) &nrc->blast_prio; - nrc->paramValues[1] = (const char *) &nrc->blast_rowid; - nrc->paramLengths[0] = sizeof (nrc->blast_prio); - nrc->paramLengths[1] = sizeof (nrc->blast_rowid); - break; - case 1: - nrc->pname = "select_non_anonymous"; - nrc->nparams = 2; - nrc->paramValues[0] = (const char *) &nrc->blast_prio; - nrc->paramValues[1] = (const char *) &nrc->blast_rowid; - nrc->paramLengths[0] = sizeof (nrc->blast_prio); - nrc->paramLengths[1] = sizeof (nrc->blast_rowid); - break; - case 2: - nrc->pname = "select_expiration_time"; - nrc->nparams = 2; - nrc->paramValues[0] = (const char *) &nrc->blast_expire; - nrc->paramValues[1] = (const char *) &nrc->blast_rowid; - nrc->paramLengths[0] = sizeof (nrc->blast_expire); - nrc->paramLengths[1] = sizeof (nrc->blast_rowid); - break; - case 3: - nrc->pname = "select_migration_order"; - nrc->nparams = 3; - nrc->paramValues[0] = (const char *) &nrc->blast_expire; - nrc->paramValues[1] = (const char *) &nrc->blast_rowid; - nrc->paramValues[2] = (const char *) &nrc->bnow; - nrc->paramLengths[0] = sizeof (nrc->blast_expire); - nrc->paramLengths[1] = sizeof (nrc->blast_rowid); - nrc->paramLengths[2] = sizeof (nrc->bnow); - break; - default: - GNUNET_break (0); - iter (iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (nrc); - return; - } - nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).abs_value__; - postgres_plugin_next_request (nrc, - GNUNET_NO); -} - - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our "struct Plugin*" - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -postgres_plugin_iter_low_priority (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - struct Plugin *plugin = cls; - - postgres_iterate (plugin, - type, - GNUNET_YES, 0, - iter, iter_cls); -} - - - - /** * Iterate over the results for a particular key * in the datastore. @@ -1063,12 +854,7 @@ postgres_plugin_get (void *cls, const int paramFormats[] = { 1, 1, 1, 1, 1 }; PGresult *ret; - if (key == NULL) - { - postgres_plugin_iter_low_priority (plugin, type, - iter, iter_cls); - return; - } + GNUNET_assert (key != NULL); nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); nrc->plugin = plugin; nrc->iter = iter; @@ -1087,11 +873,9 @@ postgres_plugin_get (void *cls, nrc->paramLengths[1] = sizeof (nrc->vhash); nrc->paramValues[2] = (const char *) &nrc->btype; nrc->paramLengths[2] = sizeof (nrc->btype); - nrc->paramValues[3] = (const char *) &nrc->blast_rowid; - nrc->paramLengths[3] = sizeof (nrc->blast_rowid); - nrc->paramValues[4] = (const char *) &nrc->blimit_off; - nrc->paramLengths[4] = sizeof (nrc->blimit_off); - nrc->nparams = 5; + nrc->paramValues[3] = (const char *) &nrc->blimit_off; + nrc->paramLengths[3] = sizeof (nrc->blimit_off); + nrc->nparams = 4; nrc->pname = "getvt"; ret = PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", @@ -1105,11 +889,9 @@ postgres_plugin_get (void *cls, { nrc->paramValues[1] = (const char *) &nrc->btype; nrc->paramLengths[1] = sizeof (nrc->btype); - nrc->paramValues[2] = (const char *) &nrc->blast_rowid; - nrc->paramLengths[2] = sizeof (nrc->blast_rowid); - nrc->paramValues[3] = (const char *) &nrc->blimit_off; - nrc->paramLengths[3] = sizeof (nrc->blimit_off); - nrc->nparams = 4; + nrc->paramValues[2] = (const char *) &nrc->blimit_off; + nrc->paramLengths[2] = sizeof (nrc->blimit_off); + nrc->nparams = 3; nrc->pname = "gett"; ret = PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", @@ -1126,11 +908,9 @@ postgres_plugin_get (void *cls, { nrc->paramValues[1] = (const char *) &nrc->vhash; nrc->paramLengths[1] = sizeof (nrc->vhash); - nrc->paramValues[2] = (const char *) &nrc->blast_rowid; - nrc->paramLengths[2] = sizeof (nrc->blast_rowid); - nrc->paramValues[3] = (const char *) &nrc->blimit_off; - nrc->paramLengths[3] = sizeof (nrc->blimit_off); - nrc->nparams = 4; + nrc->paramValues[2] = (const char *) &nrc->blimit_off; + nrc->paramLengths[2] = sizeof (nrc->blimit_off); + nrc->nparams = 3; nrc->pname = "getv"; ret = PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", @@ -1142,11 +922,9 @@ postgres_plugin_get (void *cls, } else { - nrc->paramValues[1] = (const char *) &nrc->blast_rowid; - nrc->paramLengths[1] = sizeof (nrc->blast_rowid); - nrc->paramValues[2] = (const char *) &nrc->blimit_off; - nrc->paramLengths[2] = sizeof (nrc->blimit_off); - nrc->nparams = 3; + nrc->paramValues[1] = (const char *) &nrc->blimit_off; + nrc->paramLengths[1] = sizeof (nrc->blimit_off); + nrc->nparams = 2; nrc->pname = "get"; ret = PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1", @@ -1199,6 +977,131 @@ postgres_plugin_get (void *cls, } +/** + * Select a subset of the items in the datastore and call + * the given iterator for each of them. + * + * @param cls our "struct Plugin*" + * @param type entries of which type should be considered? + * Use 0 for any type. + * @param iter function to call on each matching value; + * will be called once with a NULL value at the end + * @param iter_cls closure for iter + */ +static void +postgres_plugin_iter_zero_anonymity (void *cls, + enum GNUNET_BLOCK_Type type, + PluginIterator iter, + void *iter_cls) +{ + struct Plugin *plugin = cls; + struct NextRequestClosure *nrc; + + nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); + nrc->btype = htonl ((uint32_t) type); + nrc->plugin = plugin; + nrc->iter = iter; + nrc->iter_cls = iter_cls; + nrc->pname = "select_non_anonymous"; + nrc->nparams = 1; + nrc->paramLengths[0] = sizeof (nrc->bcount); + nrc->paramValues[0] = (const char*) &nrc->bcount; + postgres_plugin_next_request (nrc, + GNUNET_NO); +} + +/** + * Context for 'repl_iter' function. + */ +struct ReplCtx +{ + + /** + * Plugin handle. + */ + struct Plugin *plugin; + + /** + * Function to call for the result (or the NULL). + */ + PluginIterator iter; + + /** + * Closure for iter. + */ + void *iter_cls; +}; + + +/** + * Wrapper for the iterator for 'sqlite_plugin_replication_get'. + * Decrements the replication counter and calls the original + * iterator. + * + * @param cls closure + * @param next_cls closure to pass to the "next" function. + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + * + * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue + * (continue on call to "next", of course), + * GNUNET_NO to delete the item and continue (if supported) + */ +static int +repl_iter (void *cls, + void *next_cls, + const GNUNET_HashCode *key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) +{ + struct ReplCtx *rc = cls; + struct Plugin *plugin = rc->plugin; + int ret; + PGresult *qret; + uint32_t boid; + + ret = rc->iter (rc->iter_cls, + next_cls, key, + size, data, + type, priority, anonymity, expiration, + uid); + if (NULL != key) + { + boid = htonl ( (uint32_t) uid); + const char *paramValues[] = { + (const char *) &boid, + }; + int paramLengths[] = { + sizeof (boid), + }; + const int paramFormats[] = { 1 }; + qret = PQexecPrepared (plugin->dbh, + "decrepl", + 1, paramValues, paramLengths, paramFormats, 1); + if (GNUNET_OK != check_result (plugin, + qret, + PGRES_COMMAND_OK, + "PQexecPrepared", + "decrepl", __LINE__)) + return GNUNET_SYSERR; + PQclear (qret); + } + return ret; +} + + /** * Get a random item for replication. Returns a single, not expired, random item * from those with the highest replication counters. The item's @@ -1213,9 +1116,21 @@ static void postgres_plugin_replication_get (void *cls, PluginIterator iter, void *iter_cls) { - /* FIXME: not implemented! */ - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + struct Plugin *plugin = cls; + struct NextRequestClosure *nrc; + struct ReplCtx rc; + + rc.plugin = plugin; + rc.iter = iter; + rc.iter_cls = iter_cls; + nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); + nrc->one_shot = GNUNET_YES; + nrc->plugin = plugin; + nrc->iter = &repl_iter; + nrc->iter_cls = &rc; + nrc->pname = "select_replication_order"; + nrc->nparams = 0; + postgres_next_request_cont (nrc, NULL); } @@ -1231,34 +1146,80 @@ static void postgres_plugin_expiration_get (void *cls, PluginIterator iter, void *iter_cls) { - /* FIXME: not implemented! */ - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + struct Plugin *plugin = cls; + struct NextRequestClosure *nrc; + uint64_t btime; + + btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value); + nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); + nrc->one_shot = GNUNET_YES; + nrc->plugin = plugin; + nrc->iter = iter; + nrc->iter_cls = iter_cls; + nrc->pname = "select_expiration_order"; + nrc->nparams = 1; + nrc->paramValues[0] = (const char *) &btime; + nrc->paramLengths[0] = sizeof (btime); + postgres_next_request_cont (nrc, NULL); } /** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. + * Update the priority for a particular key in the datastore. If + * the expiration time in value is different than the time found in + * the datastore, the higher value should be kept. For the + * anonymity level, the lower value is to be used. The specified + * priority should be added to the existing priority, ignoring the + * priority in value. + * + * Note that it is possible for multiple values to match this put. + * In that case, all of the respective values are updated. * * @param cls our "struct Plugin*" - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param uid unique identifier of the datum + * @param delta by how much should the priority + * change? If priority + delta < 0 the + * priority should be set to 0 (never go + * negative). + * @param expire new expiration time should be the + * MAX of any existing expiration time and + * this value + * @param msg set to error message + * @return GNUNET_OK on success */ -static void -postgres_plugin_iter_zero_anonymity (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) +static int +postgres_plugin_update (void *cls, + uint64_t uid, + int delta, struct GNUNET_TIME_Absolute expire, + char **msg) { struct Plugin *plugin = cls; + PGresult *ret; + int32_t bdelta = (int32_t) htonl ((uint32_t) delta); + uint32_t boid = htonl ( (uint32_t) uid); + uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__; + const char *paramValues[] = { + (const char *) &bdelta, + (const char *) &bexpire, + (const char *) &boid, + }; + int paramLengths[] = { + sizeof (bdelta), + sizeof (bexpire), + sizeof (boid), + }; + const int paramFormats[] = { 1, 1, 1 }; - postgres_iterate (plugin, - type, GNUNET_NO, 1, - iter, iter_cls); + ret = PQexecPrepared (plugin->dbh, + "update", + 3, paramValues, paramLengths, paramFormats, 1); + if (GNUNET_OK != check_result (plugin, + ret, + PGRES_COMMAND_OK, + "PQexecPrepared", "update", __LINE__)) + return GNUNET_SYSERR; + PQclear (ret); + return GNUNET_OK; } -- cgit v1.2.3