From b44b31bd94d3a7479e1ab2bf76acd82030455703 Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Wed, 12 Oct 2016 21:54:30 +0000 Subject: psycstore: fix postgres --- src/include/gnunet_psycstore_plugin.h | 2 +- src/psycstore/plugin_psycstore_mysql.c | 14 +-- src/psycstore/plugin_psycstore_postgres.c | 136 +++++++++++------------------- src/psycstore/plugin_psycstore_sqlite.c | 6 +- 4 files changed, 62 insertions(+), 96 deletions(-) diff --git a/src/include/gnunet_psycstore_plugin.h b/src/include/gnunet_psycstore_plugin.h index 4d19ce86d..b8dd0cc98 100644 --- a/src/include/gnunet_psycstore_plugin.h +++ b/src/include/gnunet_psycstore_plugin.h @@ -114,7 +114,7 @@ struct GNUNET_PSYCSTORE_PluginFunctions (*message_add_flags) (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, - uint64_t psycstore_flags); + uint32_t psycstore_flags); /** * Retrieve a message fragment range by fragment ID. diff --git a/src/psycstore/plugin_psycstore_mysql.c b/src/psycstore/plugin_psycstore_mysql.c index 71f2eb5b8..01a0282c8 100644 --- a/src/psycstore/plugin_psycstore_mysql.c +++ b/src/psycstore/plugin_psycstore_mysql.c @@ -296,18 +296,18 @@ database_setup (struct Plugin *plugin) /* Create tables */ STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n" " id BIGINT UNSIGNED AUTO_INCREMENT,\n" - " pub_key BLOB,\n" + " pub_key BLOB(23),\n" " max_state_message_id BIGINT UNSIGNED,\n" " state_hash_message_id BIGINT UNSIGNED,\n" " PRIMARY KEY(id),\n" - " UNIQUE KEY(pub_key(5))\n" + " UNIQUE KEY(pub_key(32))\n" ");"); STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n" " id BIGINT UNSIGNED AUTO_INCREMENT,\n" - " pub_key BLOB,\n" + " pub_key BLOB(32),\n" " PRIMARY KEY(id),\n" - " UNIQUE KEY(pub_key(5))\n" + " UNIQUE KEY(pub_key(32))\n" ");"); STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n" @@ -521,7 +521,7 @@ database_setup (struct Plugin *plugin) PREP ("SELECT name, value_current\n" "FROM state\n" "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" - " AND (name = ? OR substr(name, 1, ?) = ? || '_');", + " AND (name = ? OR substr(name, 1, ?) = ?);", &plugin->select_state_prefix); PREP ("SELECT name, value_signed\n" @@ -905,7 +905,7 @@ static int message_add_flags (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, - uint64_t psycstore_flags) + uint32_t psycstore_flags) { struct Plugin *plugin = cls; struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags; @@ -914,7 +914,7 @@ message_add_flags (void *cls, int ret = GNUNET_SYSERR; struct GNUNET_MY_QueryParam params_update[] = { - GNUNET_MY_query_param_uint64 (&psycstore_flags), + GNUNET_MY_query_param_uint32 (&psycstore_flags), GNUNET_MY_query_param_auto_from_type (channel_key), GNUNET_MY_query_param_uint64 (&message_id), GNUNET_MY_query_param_end diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c index 5bbb3c447..3439856b9 100644 --- a/src/psycstore/plugin_psycstore_postgres.c +++ b/src/psycstore/plugin_psycstore_postgres.c @@ -100,7 +100,7 @@ database_setup (struct Plugin *plugin) GNUNET_POSTGRES_exec(plugin->dbh, "CREATE TABLE IF NOT EXISTS channels (\n" " id SERIAL,\n" - " pub_key BYTEA,\n" + " pub_key BYTEA(32),\n" " max_state_message_id BIGINT,\n" " state_hash_message_id BIGINT,\n" " PRIMARY KEY(id)\n" @@ -109,8 +109,7 @@ database_setup (struct Plugin *plugin) (GNUNET_OK != GNUNET_POSTGRES_exec(plugin->dbh, "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n" - " ON channels (substring(pub_key from 1 for 5)\n" - ")")) || + " ON channels (pub_key)")) || (GNUNET_OK != GNUNET_POSTGRES_exec(plugin->dbh, @@ -122,15 +121,14 @@ database_setup (struct Plugin *plugin) GNUNET_POSTGRES_exec(plugin->dbh, "CREATE TABLE IF NOT EXISTS slaves (\n" " id SERIAL,\n" - " pub_key BYTEA,\n" + " pub_key BYTEA(32),\n" " PRIMARY KEY(id)\n" ")" "WITH OIDS")) || (GNUNET_OK != GNUNET_POSTGRES_exec(plugin->dbh, "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n" - " ON slaves (substring(pub_key from 1 for 5)\n" - ")")) || + " ON slaves (pub_key)")) || (GNUNET_OK != GNUNET_POSTGRES_exec(plugin->dbh, @@ -143,7 +141,7 @@ database_setup (struct Plugin *plugin) "CREATE TABLE IF NOT EXISTS membership (\n" " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n" - " did_join BIGINT NOT NULL,\n" + " did_join INT NOT NULL,\n" " announced_at BIGINT NOT NULL,\n" " effective_since BIGINT NOT NULL,\n" " group_generation BIGINT NOT NULL\n" @@ -159,7 +157,7 @@ database_setup (struct Plugin *plugin) GNUNET_POSTGRES_exec(plugin->dbh, "CREATE TABLE IF NOT EXISTS messages (\n" " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" - " hop_counter BIGINT NOT NULL,\n" + " hop_counter INT NOT NULL,\n" " signature BYTEA,\n" " purpose BYTEA,\n" " fragment_id BIGINT NOT NULL,\n" @@ -179,29 +177,17 @@ database_setup (struct Plugin *plugin) " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" " name TEXT NOT NULL,\n" " value_current BYTEA,\n" - " value_signed BYTEA\n" + " value_signed BYTEA,\n" + " PRIMARY KEY (channel_id, name)\n" ")" "WITH OIDS")) || - - (GNUNET_OK != - GNUNET_POSTGRES_exec(plugin->dbh, - "CREATE UNIQUE INDEX IF NOT EXISTS state_uniq_idx \n" - " ON state (channel_id, substring(name from 1 for 5)\n" - ")")) || - (GNUNET_OK != GNUNET_POSTGRES_exec(plugin->dbh, "CREATE TABLE IF NOT EXISTS state_sync (\n" " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" " name TEXT NOT NULL,\n" " value BYTEA,\n" - " PRIMARY KEY (channel_id)\n" - ")" "WITH OIDS")) || - - (GNUNET_OK != - GNUNET_POSTGRES_exec(plugin->dbh, - "CREATE UNIQUE INDEX IF NOT EXISTS state_sync_name_idx \n" - " ON state_sync (substring(name from 1 for 5)\n" - ")"))) + " PRIMARY KEY (channel_id, name)\n" + ")" "WITH OIDS"))) { PQfinish (plugin->dbh); plugin->dbh = NULL; @@ -370,7 +356,7 @@ database_setup (struct Plugin *plugin) "LEFT JOIN (SELECT channel_id, name, value_signed\n" " FROM state) AS old\n" "ON new.channel_id = old.channel_id AND new.name = old.name\n" - "ON CONFLICT ( channel_id, substring(name from 1 for 5) )\n" + "ON CONFLICT (channel_id, name)\n" " DO UPDATE SET value_current = EXCLUDED.value_current,\n" " value_signed = EXCLUDED.value_signed", 3)) || @@ -422,7 +408,7 @@ database_setup (struct Plugin *plugin) "SELECT name, value_current\n" "FROM state\n" "WHERE channel_id = get_chan_id($1)\n" - " AND (name = $2 OR substr(name, 1, $3) = $4 || '_')", 4)) || + " AND (name = $2 OR substr(name, 1, $3) = $4)", 4)) || (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, "select_state_signed", @@ -764,7 +750,7 @@ fragment_store (void *cls, uint64_t message_id = GNUNET_ntohll (msg->message_id); uint64_t group_generation = GNUNET_ntohll (msg->group_generation); - uint64_t hop_counter = ntohl(msg->hop_counter); + uint32_t hop_counter = ntohl(msg->hop_counter); uint32_t flags = ntohl(msg->flags); if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX || @@ -783,7 +769,7 @@ fragment_store (void *cls, struct GNUNET_PQ_QueryParam params_insert[] = { GNUNET_PQ_query_param_auto_from_type (channel_key), - GNUNET_PQ_query_param_uint64 (&hop_counter), + GNUNET_PQ_query_param_uint32 (&hop_counter), GNUNET_PQ_query_param_auto_from_type (&msg->signature), GNUNET_PQ_query_param_auto_from_type (&msg->purpose), GNUNET_PQ_query_param_uint64 (&fragment_id), @@ -819,15 +805,13 @@ static int message_add_flags (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, - uint64_t psycstore_flags) + uint32_t psycstore_flags) { PGresult *res; struct Plugin *plugin = cls; - int ret = GNUNET_SYSERR; - struct GNUNET_PQ_QueryParam params_update[] = { - GNUNET_PQ_query_param_uint64 (&psycstore_flags), + GNUNET_PQ_query_param_uint32 (&psycstore_flags), GNUNET_PQ_query_param_auto_from_type (channel_key), GNUNET_PQ_query_param_uint64 (&message_id), GNUNET_PQ_query_param_end @@ -838,10 +822,10 @@ message_add_flags (void *cls, res, PGRES_COMMAND_OK, "PQexecPrepared","update_message_flags")) - return ret; + return GNUNET_SYSERR; PQclear (res); - return ret; + return GNUNET_OK; } @@ -850,7 +834,8 @@ fragment_row (struct Plugin *plugin, const char *stmt, PGresult *res, GNUNET_PSYCSTORE_FragmentCallback cb, - void *cb_cls) + void *cb_cls, + uint64_t *returned_fragments) { uint32_t hop_counter; void *signature = NULL; @@ -862,14 +847,13 @@ fragment_row (struct Plugin *plugin, uint64_t fragment_offset; uint64_t message_id; uint64_t group_generation; - uint64_t flags; + uint32_t flags; void *buf; size_t buf_size; int ret = GNUNET_SYSERR; struct GNUNET_MULTICAST_MessageHeader *mp; - uint64_t msg_flags; - unsigned int cnt; + uint32_t msg_flags; struct GNUNET_PQ_ResultSpec results[] = { GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), @@ -879,8 +863,8 @@ fragment_row (struct Plugin *plugin, GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset), GNUNET_PQ_result_spec_uint64 ("message_id", &message_id), GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation), - GNUNET_PQ_result_spec_uint64 ("multicast_flags", &msg_flags), - GNUNET_PQ_result_spec_uint64 ("psycstore_flags", &flags), + GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags), + GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags), GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size), GNUNET_PQ_result_spec_end }; @@ -895,15 +879,12 @@ fragment_row (struct Plugin *plugin, return GNUNET_SYSERR; } - cnt = PQntuples (res); - if (cnt == 0) + int nrows = PQntuples (res); + for (int row = 0; row < nrows; row++) { - ret = GNUNET_NO; - } - else - { - if (GNUNET_OK != GNUNET_PQ_extract_result(res, results, 0)) { - return GNUNET_SYSERR; + if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row)) + { + break; } mp = GNUNET_malloc (sizeof (*mp) + buf_size); @@ -928,6 +909,8 @@ fragment_row (struct Plugin *plugin, buf_size); GNUNET_PQ_cleanup_result(results); ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); + if (NULL != returned_fragments) + (*returned_fragments)++; } return ret; @@ -956,8 +939,7 @@ fragment_select (struct Plugin *plugin, ret = GNUNET_NO; else { - ret = fragment_row (plugin, stmt, res, cb, cb_cls); - (*returned_fragments)++; + ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments); } PQclear (res); } @@ -1044,6 +1026,9 @@ message_get (void *cls, struct Plugin *plugin = cls; *returned_fragments = 0; + if (0 == fragment_limit) + fragment_limit = INT64_MAX; + struct GNUNET_PQ_QueryParam params_select[] = { GNUNET_PQ_query_param_auto_from_type (channel_key), GNUNET_PQ_query_param_uint64 (&first_message_id), @@ -1122,7 +1107,7 @@ message_get_fragment (void *cls, if (PQntuples (res) == 0) ret = GNUNET_NO; else - ret = fragment_row (plugin, stmt, res, cb, cb_cls); + ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL); PQclear (res); } @@ -1164,14 +1149,13 @@ counters_message_get (void *cls, } struct GNUNET_PQ_ResultSpec results_select[] = { - GNUNET_PQ_result_spec_uint64 ("max_fragment_id", max_fragment_id), - GNUNET_PQ_result_spec_uint64 ("max_message_id", max_message_id), - GNUNET_PQ_result_spec_uint64 ("max_group_generation", max_group_generation), + GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id), + GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id), + GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation), GNUNET_PQ_result_spec_end }; - if (GNUNET_OK != GNUNET_PQ_extract_result (res, - results_select, 0)) + if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0)) { PQclear (res); return GNUNET_SYSERR; @@ -1221,8 +1205,7 @@ counters_state_get (void *cls, GNUNET_PQ_result_spec_end }; - ret = GNUNET_PQ_extract_result (res, - results_select, 0); + ret = GNUNET_PQ_extract_result (res, results_select, 0); if (GNUNET_OK != ret) { @@ -1540,8 +1523,7 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, ret = GNUNET_NO; } - ret = GNUNET_PQ_extract_result (res, - results, 0); + ret = GNUNET_PQ_extract_result (res, results, 0); if (GNUNET_OK != ret) { @@ -1573,7 +1555,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ { PGresult *res; struct Plugin *plugin = cls; - int ret = GNUNET_SYSERR; + int ret = GNUNET_NO; const char *stmt = "select_state_prefix"; @@ -1606,18 +1588,11 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ return GNUNET_SYSERR; } - do + int nrows = PQntuples (res); + for (int row = 0; row < nrows; row++) { - if (PQntuples (res) == 0) + if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row)) { - PQclear (res); - ret = GNUNET_NO; - break; - } - - if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0)) - { - PQclear (res); break; } @@ -1626,7 +1601,6 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ value_size); GNUNET_PQ_cleanup_result(results); } - while (ret == GNUNET_YES); PQclear (res); @@ -1648,7 +1622,7 @@ state_get_signed (void *cls, { PGresult *res; struct Plugin *plugin = cls; - int ret = GNUNET_SYSERR; + int ret = GNUNET_NO; const char *stmt = "select_state_signed"; @@ -1676,18 +1650,11 @@ state_get_signed (void *cls, return GNUNET_SYSERR; } - do + int nrows = PQntuples (res); + for (int row = 0; row < nrows; row++) { - if (PQntuples (res) == 0) + if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row)) { - PQclear (res); - ret = GNUNET_NO; - break; - } - - if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0)) - { - PQclear (res); break; } @@ -1695,9 +1662,8 @@ state_get_signed (void *cls, value_signed, value_size); - GNUNET_PQ_cleanup_result(results); + GNUNET_PQ_cleanup_result (results); } - while (ret == GNUNET_YES); PQclear (res); diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c index e6f795971..4d21696ce 100644 --- a/src/psycstore/plugin_psycstore_sqlite.c +++ b/src/psycstore/plugin_psycstore_sqlite.c @@ -599,7 +599,7 @@ database_setup (struct Plugin *plugin) "SELECT name, value_current\n" "FROM state\n" "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" - " AND (name = ? OR substr(name, 1, ?) = ? || '_');", + " AND (name = ? OR substr(name, 1, ?) = ?);", &plugin->select_state_prefix); sql_prepare (plugin->dbh, @@ -998,7 +998,7 @@ static int message_add_flags (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, - uint64_t psycstore_flags) + uint32_t psycstore_flags) { struct Plugin *plugin = cls; sqlite3_stmt *stmt = plugin->update_message_flags; @@ -1773,7 +1773,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, sizeof (*channel_key), SQLITE_STATIC) || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC) - || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len + 1) + || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len) || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC)) { LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, -- cgit v1.2.3